From 78b1fbb87c49d4c7da0dfdefce608d219917c293 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 7 Jun 2024 11:45:35 +0200 Subject: [PATCH 1/3] fix abort function for iworkflow --- main/lib/idds/agents/carrier/finisher.py | 127 ++++++++++++-- main/lib/idds/agents/carrier/iutils.py | 30 ++++ main/lib/idds/agents/carrier/plugins/panda.py | 33 ++++ main/lib/idds/agents/carrier/poller.py | 2 +- main/lib/idds/agents/clerk/clerk.py | 98 +++++++++-- .../idds/agents/transformer/transformer.py | 162 ++++++++++++++---- 6 files changed, 387 insertions(+), 65 deletions(-) diff --git a/main/lib/idds/agents/carrier/finisher.py b/main/lib/idds/agents/carrier/finisher.py index 5ab39859..63ac1aa2 100644 --- a/main/lib/idds/agents/carrier/finisher.py +++ b/main/lib/idds/agents/carrier/finisher.py @@ -28,7 +28,7 @@ handle_resume_processing, # is_process_terminated, sync_processing) -from .iutils import sync_iprocessing +from .iutils import sync_iprocessing, handle_abort_iprocessing, handle_resume_iprocessing from .poller import Poller setup_logging(__name__) @@ -346,6 +346,43 @@ def handle_abort_processing(self, processing, log_prefix=""): return ret return None + def handle_abort_iprocessing(self, processing, log_prefix=""): + """ + process abort processing + """ + try: + plugin = None + if processing['processing_type']: + plugin_name = processing['processing_type'].name.lower() + '_poller' + plugin = self.get_plugin(plugin_name) + else: + raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type']) + + processing_status, update_collections, update_contents, messages = handle_abort_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix) + + update_processing = {'processing_id': processing['processing_id'], + 'parameters': {'status': processing_status, + 'substatus': ProcessingStatus.ToCancel, + 'locking': ProcessingLocking.Idle}} + ret = {'update_processing': update_processing, + 'update_collections': update_collections, + 'update_contents': update_contents, + 'messages': messages + } + return ret + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}} + update_processing = {'processing_id': processing['processing_id'], + 'parameters': {'status': ProcessingStatus.ToCancel, + 'locking': ProcessingLocking.Idle, + 'errors': processing['errors'] if processing['errors'] else {}}} + update_processing['parameters']['errors'].update(error) + ret = {'update_processing': update_processing} + return ret + return None + def process_abort_processing(self, event): self.number_workers += 1 pro_ret = ReturnCode.Ok.value @@ -373,17 +410,27 @@ def process_abort_processing(self, event): self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret)) self.update_processing(ret, pr) elif pr: - ret = self.handle_abort_processing(pr, log_prefix=log_pre) - ret_copy = {} - for ret_key in ret: - if ret_key != 'messages': - ret_copy[ret_key] = ret[ret_key] - self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy)) + if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]: + ret = self.handle_abort_iprocessing(pr, log_prefix=log_pre) + self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret)) - self.update_processing(ret, pr) - self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id']) - event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content) - self.event_bus.send(event) + self.update_processing(ret, pr, use_bulk_update_mappings=False) + + self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id']) + event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content) + self.event_bus.send(event) + else: + ret = self.handle_abort_processing(pr, log_prefix=log_pre) + ret_copy = {} + for ret_key in ret: + if ret_key != 'messages': + ret_copy[ret_key] = ret[ret_key] + self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy)) + + self.update_processing(ret, pr) + self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id']) + event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content) + self.event_bus.send(event) except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) @@ -419,6 +466,42 @@ def handle_resume_processing(self, processing, log_prefix=""): return ret return None + def handle_resume_iprocessing(self, processing, log_prefix=""): + """ + process resume processing + """ + try: + plugin = None + if processing['processing_type']: + plugin_name = processing['processing_type'].name.lower() + '_poller' + plugin = self.get_plugin(plugin_name) + else: + raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type']) + + processing_status, update_collections, update_contents = handle_resume_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix) + + update_processing = {'processing_id': processing['processing_id'], + 'parameters': {'status': processing_status, + 'substatus': ProcessingStatus.ToResume, + 'locking': ProcessingLocking.Idle}} + ret = {'update_processing': update_processing, + 'update_collections': update_collections, + 'update_contents': update_contents, + } + return ret + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}} + update_processing = {'processing_id': processing['processing_id'], + 'parameters': {'status': ProcessingStatus.ToResume, + 'locking': ProcessingLocking.Idle, + 'errors': processing['errors'] if processing['errors'] else {}}} + update_processing['parameters']['errors'].update(error) + ret = {'update_processing': update_processing} + return ret + return None + def process_resume_processing(self, event): self.number_workers += 1 pro_ret = ReturnCode.Ok.value @@ -445,14 +528,24 @@ def process_resume_processing(self, event): self.update_processing(ret, pr) elif pr: - ret = self.handle_resume_processing(pr, log_prefix=log_pre) - self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret)) + if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]: + ret = self.handle_resume_iprocessing(pr, log_prefix=log_pre) + self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret)) - self.update_processing(ret, pr, use_bulk_update_mappings=False) + self.update_processing(ret, pr, use_bulk_update_mappings=False) - self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id']) - event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content) - self.event_bus.send(event) + self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id']) + event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content) + self.event_bus.send(event) + else: + ret = self.handle_resume_processing(pr, log_prefix=log_pre) + self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret)) + + self.update_processing(ret, pr, use_bulk_update_mappings=False) + + self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id']) + event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content) + self.event_bus.send(event) except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) diff --git a/main/lib/idds/agents/carrier/iutils.py b/main/lib/idds/agents/carrier/iutils.py index 1daa935a..0eff63d5 100644 --- a/main/lib/idds/agents/carrier/iutils.py +++ b/main/lib/idds/agents/carrier/iutils.py @@ -76,6 +76,36 @@ def handle_update_iprocessing(processing, agent_attributes, plugin=None, max_upd return status, [], [], [], [], [], [], [] +def handle_abort_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''): + logger = get_logger(logger) + + workload_id = processing['workload_id'] + + try: + status = plugin.abort(workload_id, logger=logger, log_prefix=log_prefix) + logger.info(log_prefix + "abort work (status: %s, workload_id: %s)" % (status, workload_id)) + except Exception as ex: + err_msg = "abort work failed with exception: %s" % (ex) + logger.error(log_prefix + err_msg) + raise Exception(err_msg) + return status, [], [], [] + + +def handle_resume_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''): + logger = get_logger(logger) + + workload_id = processing['workload_id'] + + try: + status = plugin.resume(workload_id, logger=logger, log_prefix=log_prefix) + logger.info(log_prefix + "resume work (status: %s, workload_id: %s)" % (status, workload_id)) + except Exception as ex: + err_msg = "resume work failed with exception: %s" % (ex) + logger.error(log_prefix + err_msg) + raise Exception(err_msg) + return status, [], [] + + def sync_iprocessing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""): # logger = get_logger() diff --git a/main/lib/idds/agents/carrier/plugins/panda.py b/main/lib/idds/agents/carrier/plugins/panda.py index c2110056..7ab32fda 100644 --- a/main/lib/idds/agents/carrier/plugins/panda.py +++ b/main/lib/idds/agents/carrier/plugins/panda.py @@ -159,3 +159,36 @@ def poll(self, workload_id, logger=None, log_prefix=''): logger.error(log_prefix + str(ex)) logger.error(traceback.format_exc()) raise ex + + def abort(self, workload_id, logger=None, log_prefix=''): + from pandaclient import Client + + try: + if logger: + logger.info(log_prefix + f"aborting task {workload_id}") + Client.killTask(workload_id, soft=True) + status, task_status = Client.getTaskStatus(workload_id) + if status == 0: + return self.get_processing_status(task_status) + else: + msg = "Failed to abort task %s: status: %s, task_status: %s" % (workload_id, status, task_status) + raise Exception(msg) + except Exception as ex: + if logger: + logger.error(log_prefix + str(ex)) + logger.error(traceback.format_exc()) + raise ex + + def resume(self, workload_id, logger=None, log_prefix=''): + from pandaclient import Client + + try: + if logger: + logger.info(log_prefix + f"resuming task {workload_id}") + status, out = Client.retryTask(workload_id, newParams={}) + return ProcessingStatus.Running + except Exception as ex: + if logger: + logger.error(log_prefix + str(ex)) + logger.error(traceback.format_exc()) + raise ex diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 86001cba..7ba594d6 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -430,7 +430,7 @@ def handle_update_iprocessing(self, processing): plugin_name = processing['processing_type'].name.lower() + '_poller' plugin = self.get_plugin(plugin_name) else: - raise exceptions.ProcessSubmitFailed('No corresponding submitter plugins for %s' % processing['processing_type']) + raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type']) ret_handle_update_processing = handle_update_iprocessing(processing, self.agent_attributes, diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index 63d73cf0..2d67d7ed 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -1387,13 +1387,11 @@ def process_abort_request(self, event): self.update_request(ret) self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already terminated. Cannot be aborted") elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: - # todo - ret = {'request_id': req['request_id'], - 'parameters': {'locking': RequestLocking.Idle, - 'status': req['status']} - } + ret = self.handle_close_irequest(req, event=event) self.update_request(ret) - self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support abortion for iWorkflow") + + # self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support abortion for iWorkflow") + self.handle_command(event, cmd_status=CommandStatus.Processed, errors=None) else: ret = self.handle_abort_request(req, event) self.logger.info(log_pre + "process_abort_request result: %s" % str(ret)) @@ -1488,6 +1486,7 @@ def handle_close_irequest(self, req, event): self.logger.debug(log_msg) parameters = {'status': req_status, + 'substatus': RequestStatus.ToClose, 'locking': RequestLocking.Idle, 'request_metadata': req['request_metadata'] } @@ -1540,7 +1539,42 @@ def process_close_request(self, event): self.update_request(ret) else: pass - # todo + ret = self.handle_abort_request(req, event) + self.logger.info(log_pre + "process_abort_request result: %s" % str(ret)) + self.update_request(ret) + to_abort_transform_id = None + if event and event._content and event._content['cmd_content'] and 'transform_id' in event._content['cmd_content']: + to_abort_transform_id = event._content['cmd_content']['transform_id'] + + if req['status'] in [RequestStatus.Building]: + wf = req['request_metadata']['build_workflow'] + else: + if 'workflow' in req['request_metadata']: + wf = req['request_metadata']['workflow'] + else: + wf = req['request_metadata']['build_workflow'] + works = wf.get_all_works() + if works: + has_abort_work = False + for work in works: + if (work.is_started() or work.is_starting()) and not work.is_terminated(): + if not to_abort_transform_id or to_abort_transform_id == work.get_work_id(): + self.logger.info(log_pre + "AbortTransformEvent(transform_id: %s)" % str(work.get_work_id())) + event = AbortTransformEvent(publisher_id=self.id, + transform_id=work.get_work_id(), + content=event._content) + self.event_bus.send(event) + has_abort_work = True + if not has_abort_work: + self.logger.info(log_pre + "not has abort work") + self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id'])) + event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content) + self.event_bus.send(event) + else: + # no works. should trigger update request + self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % str(req['request_id'])) + event = UpdateRequestEvent(publisher_id=self.id, request_id=req['request_id'], content=event._content) + self.event_bus.send(event) self.handle_command(event, cmd_status=CommandStatus.Processed, errors=None) except AssertionError as ex: @@ -1556,6 +1590,47 @@ def process_close_request(self, event): self.number_workers -= 1 return pro_ret + def handle_resume_irequest(self, req, event): + """ + process resume irequest + """ + try: + log_pre = self.get_log_prefix(req) + self.logger.info(log_pre + "handle_resume_irequest event: %s" % str(event)) + + tfs = core_transforms.get_transforms(request_id=req['request_id']) + for tf in tfs: + if tf['status'] in [TransformStatus.Finished, TransformStatus.Built]: + continue + else: + event = ResumeTransformEvent(publisher_id=self.id, + transform_id=tf['transform_id'], + content=event._content) + self.event_bus.send(event) + + parameters = {'status': RequestStatus.Transforming, + 'substatus': RequestStatus.ToResume, + 'locking': RequestLocking.Idle, + 'request_metadata': req['request_metadata'] + } + parameters = self.load_poll_period(req, parameters) + + ret = {'request_id': req['request_id'], + 'parameters': parameters} + self.logger.info(log_pre + "Handle resume irequest result: %s" % str(ret)) + return ret + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + error = {'close_err': {'msg': truncate_string('%s' % (ex), length=200)}} + ret_req = {'request_id': req['request_id'], + 'parameters': {'status': RequestStatus.ToClose, + 'locking': RequestLocking.Idle, + 'errors': req['errors'] if req['errors'] else {}}} + ret_req['parameters']['errors'].update(error) + self.logger.info(log_pre + "handle_close_irequest exception result: %s" % str(ret_req)) + return ret_req + def handle_resume_request(self, req): """ process resume request @@ -1610,13 +1685,10 @@ def process_resume_request(self, event): self.update_request(ret) self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already finished. Cannot be resumed") elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]: - # todo - ret = {'request_id': req['request_id'], - 'parameters': {'locking': RequestLocking.Idle, - 'status': req['status']} - } + ret = self.handle_resume_irequest(req) self.update_request(ret) - self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support to reusme for iWorkflow") + # self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support to reusme for iWorkflow") + self.handle_command(event, cmd_status=CommandStatus.Processed, errors=None) else: ret = self.handle_resume_request(req) self.logger.info(log_pre + "process_resume_request result: %s" % str(ret)) diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index ff8c00c2..da6a014d 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -814,6 +814,47 @@ def handle_abort_transform(self, transform): return ret return None + def handle_abort_itransform(self, transform, event): + """ + process abort transform + """ + try: + log_pre = self.get_log_prefix(transform) + + self.logger.info(log_pre + "handle_abort_itransform: %s" % transform) + prs = core_processings.get_processings(transform_id=transform['transform_id']) + pr_found = None + for pr in prs: + if pr['processing_id'] == transform['current_processing_id']: + pr_found = pr + break + if pr_found: + self.logger.info(log_pre + "AbortProcessingEvent(processing_id: %s)" % pr['processing_id']) + event = AbortProcessingEvent(publisher_id=self.id, + processing_id=pr['processing_id'], + content=event._content) + self.event_bus.send(event) + + transform_parameters = {'status': TransformStatus.Transforming, + 'substatus': TransformStatus.ToCancel, + 'locking': TransformLocking.Idle} + + ret = {'transform': transform, + 'transform_parameters': transform_parameters} + return ret + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + tf_status = transform['oldstatus'] + error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}} + transform_parameters = {'status': tf_status, + 'locking': TransformLocking.Idle, + 'errors': transform['errors'] if transform['errors'] else {}} + transform_parameters['errors'].update(error) + ret = {'transform': transform, 'transform_parameters': transform_parameters} + return ret + return None + def process_abort_transform(self, event): self.number_workers += 1 pro_ret = ReturnCode.Ok.value @@ -841,20 +882,26 @@ def process_abort_transform(self, event): self.update_transform(ret) else: - ret = self.handle_abort_transform(tf) - self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret)) - if ret: - self.update_transform(ret) - - work = tf['transform_metadata']['work'] - work.set_work_id(tf['transform_id']) - work.set_agent_attributes(self.agent_attributes, tf) - - processing = work.get_processing(input_output_maps=[], without_creating=True) - if processing and processing.processing_id: - self.logger.info(log_pre + "AbortProcessingEvent(processing_id: %s)" % processing.processing_id) - event = AbortProcessingEvent(publisher_id=self.id, processing_id=processing.processing_id, content=event._content) - self.event_bus.send(event) + if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]: + ret = self.handle_abort_itransform(tf, event) + self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret)) + if ret: + self.update_transform(ret) + else: + ret = self.handle_abort_transform(tf) + self.logger.info(log_pre + "process_abort_transform result: %s" % str(ret)) + if ret: + self.update_transform(ret) + + work = tf['transform_metadata']['work'] + work.set_work_id(tf['transform_id']) + work.set_agent_attributes(self.agent_attributes, tf) + + processing = work.get_processing(input_output_maps=[], without_creating=True) + if processing and processing.processing_id: + self.logger.info(log_pre + "AbortProcessingEvent(processing_id: %s)" % processing.processing_id) + event = AbortProcessingEvent(publisher_id=self.id, processing_id=processing.processing_id, content=event._content) + self.event_bus.send(event) except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) @@ -890,6 +937,47 @@ def handle_resume_transform(self, transform): return ret return None + def handle_resume_itransform(self, transform, event): + """ + process resume transform + """ + try: + log_pre = self.get_log_prefix(transform) + + self.logger.info(log_pre + "handle_resume_itransform: %s" % transform) + prs = core_processings.get_processings(transform_id=transform['transform_id']) + pr_found = None + for pr in prs: + if pr['processing_id'] == transform['current_processing_id']: + pr_found = pr + break + if pr_found: + self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % pr['processing_id']) + event = ResumeProcessingEvent(publisher_id=self.id, + processing_id=pr['processing_id'], + content=event._content) + self.event_bus.send(event) + + transform_parameters = {'status': TransformStatus.Transforming, + 'substatus': TransformStatus.ToResume, + 'locking': TransformLocking.Idle} + + ret = {'transform': transform, + 'transform_parameters': transform_parameters} + return ret + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + tf_status = transform['oldstatus'] + error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}} + transform_parameters = {'status': tf_status, + 'locking': TransformLocking.Idle, + 'errors': transform['errors'] if transform['errors'] else {}} + transform_parameters['errors'].update(error) + ret = {'transform': transform, 'transform_parameters': transform_parameters} + return ret + return None + def process_resume_transform(self, event): self.number_workers += 1 pro_ret = ReturnCode.Ok.value @@ -913,27 +1001,33 @@ def process_resume_transform(self, event): self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret)) self.update_transform(ret) else: - ret = self.handle_resume_transform(tf) - self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret)) - if ret: - self.update_transform(ret) - - work = tf['transform_metadata']['work'] - work.set_agent_attributes(self.agent_attributes, tf) - - processing = work.get_processing(input_output_maps=[], without_creating=True) - if processing and processing.processing_id: - self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % processing.processing_id) - event = ResumeProcessingEvent(publisher_id=self.id, - processing_id=processing.processing_id, - content=event._content) - self.event_bus.send(event) + if tf['transform_type'] in [TransformType.iWorkflow, TransformType.iWork]: + ret = self.handle_resume_itransform(tf, event) + self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret)) + if ret: + self.update_transform(ret) else: - self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % tf['transform_id']) - event = UpdateTransformEvent(publisher_id=self.id, - transform_id=tf['transform_id'], - content=event._content) - self.event_bus.send(event) + ret = self.handle_resume_transform(tf) + self.logger.info(log_pre + "process_resume_transform result: %s" % str(ret)) + if ret: + self.update_transform(ret) + + work = tf['transform_metadata']['work'] + work.set_agent_attributes(self.agent_attributes, tf) + + processing = work.get_processing(input_output_maps=[], without_creating=True) + if processing and processing.processing_id: + self.logger.info(log_pre + "ResumeProcessingEvent(processing_id: %s)" % processing.processing_id) + event = ResumeProcessingEvent(publisher_id=self.id, + processing_id=processing.processing_id, + content=event._content) + self.event_bus.send(event) + else: + self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % tf['transform_id']) + event = UpdateTransformEvent(publisher_id=self.id, + transform_id=tf['transform_id'], + content=event._content) + self.event_bus.send(event) except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) From a18af02abef93a43e83dd265be3b8cb3a27604d0 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 7 Jun 2024 11:46:00 +0200 Subject: [PATCH 2/3] improve iworkflow to handle long arguments --- common/lib/idds/common/event.py | 9 ++ main/etc/sql/postgresql.sql | 6 +- main/lib/idds/tests/panda_test.py | 7 +- main/lib/idds/tests/test_domapanda.py | 12 +++ main/lib/idds/tests/test_domapanda_big.py | 12 +++ .../idds/tests/test_domapanda_pandaclient.py | 14 ++- main/tools/env/install_idds.sh | 2 + main/tools/env/setup_panda.sh | 25 +++++ monitor/data/conf.js | 12 +-- workflow/bin/run_workflow | 45 ++++++--- workflow/lib/idds/iworkflow/base.py | 46 +++++++-- workflow/lib/idds/iworkflow/work.py | 94 +++++++++++++++---- workflow/lib/idds/iworkflow/workflow.py | 78 ++++++++++----- 13 files changed, 286 insertions(+), 76 deletions(-) diff --git a/common/lib/idds/common/event.py b/common/lib/idds/common/event.py index 8446df78..b598334c 100644 --- a/common/lib/idds/common/event.py +++ b/common/lib/idds/common/event.py @@ -88,6 +88,7 @@ def event_type(self): return self._event_type.name def able_to_merge(self, event): + """ if self._event_type == event._event_type and self.get_event_id() == event.get_event_id(): return True if self._event_type == event._event_type and self.get_event_id() == event.get_event_id() and self._counter == event._counter: @@ -98,6 +99,14 @@ def able_to_merge(self, event): # ddiff = DeepDiff(self._content, event._content, ignore_order=True) # if not ddiff: # return True + """ + if self._event_type == event._event_type: + if (self._content is None and event._content is None): + return True + elif (self._content is not None and event._content is not None): + ddiff = DeepDiff(self._content, event._content, ignore_order=True) + if not ddiff: + return True return False def changed(self): diff --git a/main/etc/sql/postgresql.sql b/main/etc/sql/postgresql.sql index 73cab2e8..11d77242 100644 --- a/main/etc/sql/postgresql.sql +++ b/main/etc/sql/postgresql.sql @@ -548,15 +548,15 @@ CREATE INDEX "CONTENTS_ID_NAME_IDX" ON doma_idds.contents (coll_id, scope, md5(' $$ LANGUAGE PLPGSQL -CREATE SEQUENCE doma_idds."METAINFO_ID_SEQ" START WITH 1 +CREATE SEQUENCE doma_idds."METAINFO_ID_SEQ" START WITH 1; CREATE TABLE meta_info ( meta_id BIGINT NOT NULL, - name VARCHAR2(50), + name VARCHAR(50), status INTEGER, created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, - description VARCHAR2(1000), + description VARCHAR(1000), meta_info JSONB, CONSTRAINT "METAINFO_PK" PRIMARY KEY (meta_id), -- USING INDEX LOCAL, CONSTRAINT "METAINFO_NAME_UQ" UNIQUE (name) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 1111bb72..9aca3433 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -10,6 +10,9 @@ # os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' # os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' +# os.environ['PANDA_URL_SSL'] = 'https://panda-doma-k8s-panda.cern.ch/server/panda' +# os.environ['PANDA_URL'] = 'http://panda-doma-k8s-panda.cern.ch:25080/server/panda' + # os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' # os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' @@ -65,7 +68,9 @@ task_ids = [169236, 169237, 169238, 169239, 169240, 169241] task_ids = [169272, 169273, 169312, 169313] task_ids = [169307, 169308, 169309, 169310, 169311, 169312, 169313, 169314] -task_ids = [i for i in range(169359, 169363)] +task_ids = [i for i in range(10147, 10150)] +task_ids = [30, 31, 34, 32, 33, 35] +task_ids = [169786, 169787] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 2e97d848..370c53e0 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -61,6 +61,18 @@ task_queue4 = 'LANCS_Rubin_Merge' # task_queue5 = 'LANCS_Rubin_IO' task_queue5 = 'LANCS_Rubin_Extra_Himem' +elif len(sys.argv) > 1 and sys.argv[1] == "ral": + site = 'RAL' + task_cloud = 'EU' + # task_queue = 'RAL_TEST' + task_queue = 'RAL_Rubin' + task_queue1 = 'RAL_Rubin_Medium' + task_queue2 = 'RAL_Rubin_Himem' + task_queue3 = 'RAL_Rubin_Extra_Himem' + # task_queue3 = 'RAL_Rubin_Himem' + task_queue4 = 'RAL_Rubin_Merge' + # task_queue5 = 'RAL_Rubin_IO' + task_queue5 = 'RAL_Rubin_Extra_Himem' else: site = 'slac' # task_cloud = 'LSST' diff --git a/main/lib/idds/tests/test_domapanda_big.py b/main/lib/idds/tests/test_domapanda_big.py index c9e78354..c6d87723 100644 --- a/main/lib/idds/tests/test_domapanda_big.py +++ b/main/lib/idds/tests/test_domapanda_big.py @@ -57,6 +57,18 @@ task_queue3 = 'LANCS_Rubin_Extra_Himem' task_queue3 = 'LANCS_Rubin_Himem' task_queue4 = 'LANCS_Rubin_Merge' +elif len(sys.argv) > 1 and sys.argv[1] == "ral": + site = 'RAL' + task_cloud = 'EU' + # task_queue = 'RAL_TEST' + task_queue = 'RAL_Rubin' + task_queue1 = 'RAL_Rubin_Medium' + task_queue2 = 'RAL_Rubin_Himem' + task_queue3 = 'RAL_Rubin_Extra_Himem' + # task_queue3 = 'RAL_Rubin_Himem' + task_queue4 = 'RAL_Rubin_Merge' + # task_queue5 = 'RAL_Rubin_IO' + task_queue5 = 'RAL_Rubin_Extra_Himem' else: site = 'slac' # task_cloud = 'LSST' diff --git a/main/lib/idds/tests/test_domapanda_pandaclient.py b/main/lib/idds/tests/test_domapanda_pandaclient.py index be79b62a..2da32c31 100644 --- a/main/lib/idds/tests/test_domapanda_pandaclient.py +++ b/main/lib/idds/tests/test_domapanda_pandaclient.py @@ -40,8 +40,12 @@ import pandaclient.idds_api -task_queue = 'DOMA_LSST_GOOGLE_TEST' -task_queue = 'DOMA_LSST_GOOGLE_MERGE' +# task_queue = 'DOMA_LSST_GOOGLE_TEST' +# task_queue = 'DOMA_LSST_GOOGLE_MERGE' + +task_cloud = 'US' +task_queue = "SLAC_Rubin" +task_queue = 'BNL_OSG_2' def randStr(chars=string.ascii_lowercase + string.digits, N=10): @@ -135,7 +139,7 @@ def setup_workflow(): "token": "local", "type": "template", "value": "log.tgz"}, - task_cloud='LSST') + task_cloud=task_cloud) work2 = DomaPanDAWork(executable='echo', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}], @@ -148,7 +152,7 @@ def setup_workflow(): "type": "template", "value": "log.tgz"}, encode_command_line=True, - task_cloud='LSST') + task_cloud=task_cloud) work3 = DomaPanDAWork(executable='echo', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}], @@ -160,7 +164,7 @@ def setup_workflow(): "token": "local", "type": "template", "value": "log.tgz"}, - task_cloud='LSST', + task_cloud=task_cloud, encode_command_line=True) pending_time = 12 diff --git a/main/tools/env/install_idds.sh b/main/tools/env/install_idds.sh index 1033399a..9ef9ff46 100644 --- a/main/tools/env/install_idds.sh +++ b/main/tools/env/install_idds.sh @@ -14,6 +14,8 @@ bash workflow/tools/make/make.sh echo cp workflow/bin/run_workflow_wrapper ~/www/wiscgroup/ cp workflow/bin/run_workflow_wrapper ~/www/wiscgroup/ +echo cp workflow/bin/run_workflow_wrapper /eos/user/w/wguan/www/ +cp workflow/bin/run_workflow_wrapper /eos/user/w/wguan/www/ echo scp workflow/bin/run_workflow_wrapper root@ai-idds-04:/data/iddssv1/srv/var/trf/user/ scp workflow/bin/run_workflow_wrapper root@ai-idds-04:/data/iddssv1/srv/var/trf/user/ diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index be983c95..5f81d1cc 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -38,6 +38,8 @@ elif [ "$instance" == "usdf_dev" ]; then # export PANDA_AUTH_VO=Rubin.production + export IDDS_HOST=https://rubin-panda-idds-dev.slac.stanford.edu:8443/idds + export PANDACACHE_URL=$PANDA_URL_SSL export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ @@ -58,6 +60,8 @@ elif [ "$instance" == "usdf" ]; then export PANDACACHE_URL=$PANDA_URL_SSL export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ + export IDDS_HOST=https://usdf-panda-idds.slac.stanford.edu:8443/idds + # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ export PANDA_CONFIG_ROOT=~/.panda/ export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_rubin_prod @@ -92,6 +96,27 @@ elif [ "$instance" == "new" ]; then PANDA_QUEUE=BNL_OSG_2 PANDA_WORKING_GROUP=EIC PANDA_VO=wlcg +elif [ "$instance" == "doma_k8s" ]; then + export PANDA_URL_SSL=https://panda-doma-k8s-panda.cern.ch/server/panda + export PANDA_URL=http://panda-doma-k8s-panda.cern.ch:25080/server/panda + export PANDACACHE_URL=https://panda-doma-k8s-panda.cern.ch/server/panda + export PANDAMON_URL=https://panda-doma-bigmon.cern.ch + export PANDA_AUTH=oidc + export PANDA_AUTH_VO=Rubin.production + export PANDA_AUTH_VO=EIC + export PANDA_AUTH_VO=Rubin + export PANDA_USE_NATIVE_HTTPLIB=1 + export PANDA_BEHIND_REAL_LB=true + + # export PANDA_AUTH_VO=Rubin.production + + export PANDACACHE_URL=$PANDA_URL_SSL + export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ + + # export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/ + export PANDA_CONFIG_ROOT=~/.panda/ + + export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_doma_k8s else export PANDA_AUTH=oidc export PANDA_URL_SSL=https://pandaserver-doma.cern.ch:25443/server/panda diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 3c9fa3ea..51d0a63b 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus954.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus954.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus954.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus965.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus965.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus965.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus965.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus965.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus965.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/bin/run_workflow b/workflow/bin/run_workflow index e8eecd6d..4ae4ee9f 100644 --- a/workflow/bin/run_workflow +++ b/workflow/bin/run_workflow @@ -25,6 +25,7 @@ import os import sys import time import traceback +import zlib from idds.common.utils import json_dumps, json_loads, setup_logging, decode_base64 # from idds.common.utils import merge_dict @@ -40,16 +41,16 @@ def get_context_args(context, original_args, current_job_kwargs): func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = None, None, None, None, None if original_args: original_args = json_loads(original_args) - func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = original_args + func_name, pre_kwargs, args, kwargs = original_args if args: - args = pickle.loads(base64.b64decode(args)) + args = pickle.loads(zlib.decompress(base64.b64decode(args))) if pre_kwargs: - pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs)) + pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs))) if kwargs: - kwargs = pickle.loads(base64.b64decode(kwargs)) + kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs))) if multi_jobs_kwargs_list: - multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] + multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list] if current_job_kwargs: if current_job_kwargs == "${IN/L}": @@ -59,7 +60,7 @@ def get_context_args(context, original_args, current_job_kwargs): current_job_kwargs = json_loads(current_job_kwargs) if current_job_kwargs: - current_job_kwargs = pickle.loads(base64.b64decode(current_job_kwargs)) + current_job_kwargs = pickle.loads(zlib.decompress(base64.b64decode(current_job_kwargs))) # current_job_kwargs = current_job_kwargs # if current_job_kwargs and isinstance(current_job_kwargs, dict): @@ -70,8 +71,9 @@ def get_context_args(context, original_args, current_job_kwargs): return context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs -def run_workflow(context, original_args, current_job_kwargs): +def run_workflow(name, context, original_args, current_job_kwargs): context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) + logging.info("name: %s" % name) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) logging.info("pre_kwargs: %s" % pre_kwargs) @@ -83,7 +85,8 @@ def run_workflow(context, original_args, current_job_kwargs): context.initialize() context.setup_source_files() - workflow = Workflow(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) + workflow = Workflow(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context, name=name) + workflow.load() logging.info("workflow: %s" % workflow) with workflow: ret = workflow.run() @@ -91,8 +94,9 @@ def run_workflow(context, original_args, current_job_kwargs): return 0 -def run_work(context, original_args, current_job_kwargs): +def run_work(name, context, original_args, current_job_kwargs): context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs) + logging.info("name: %s" % name) logging.info("context: %s" % context) logging.info("func_name: %s" % func_name) logging.info("pre_kwargs: %s" % pre_kwargs) @@ -104,7 +108,8 @@ def run_work(context, original_args, current_job_kwargs): context.initialize() context.setup_source_files() - work = Work(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context) + work = Work(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context, name=name) + work.load() logging.info("work: %s" % work) ret = work.run() logging.info("run work result: %s" % str(ret)) @@ -133,6 +138,20 @@ def run_iworkflow(args): else: current_job_kwargs = None + if args.args_file: + with open(args.args_file, 'r') as file: + data = file.read() + args_content = decode_base64(data) + args_content = json_loads(args_content) + if 'type' in args_content: + args.type = args_content['type'] + if 'context' in args_content: + args.context = args_content['context'] + if 'original_args' in args_content: + args.original_args = args_content['original_args'] + if 'current_job_kwargs' in args_content: + args.current_job_kwargs = args_content['current_job_kwargs'] + if args.type == 'workflow': logging.info("run workflow") password = context.broker_password @@ -141,7 +160,7 @@ def run_iworkflow(args): context.broker_password = password logging.info("original_args: %s" % original_args) logging.info("current_job_kwargs: %s" % current_job_kwargs) - exit_code = run_workflow(context, original_args, current_job_kwargs) + exit_code = run_workflow(args.name, context, original_args, current_job_kwargs) logging.info("exit code: %s" % exit_code) else: logging.info("run work") @@ -151,7 +170,7 @@ def run_iworkflow(args): context.broker_password = password logging.info("original_args: %s" % original_args) logging.info("current_job_kwargs: %s" % current_job_kwargs) - exit_code = run_work(context, original_args, current_job_kwargs) + exit_code = run_work(args.name, context, original_args, current_job_kwargs) logging.info("exit code: %s" % exit_code) return exit_code @@ -179,9 +198,11 @@ def get_parser(): oparser.add_argument('--version', action='version', version='%(prog)s ' + release_version) oparser.add_argument('--verbose', '-v', default=False, action='store_true', help="Print more verbose output.") oparser.add_argument('--type', dest='type', action='store', choices=['workflow', 'work'], default='workflow', help='The type in [workflow, work]. Default is workflow.') + oparser.add_argument('--name', dest='name', help="The name.") oparser.add_argument('--context', dest='context', help="The context.") oparser.add_argument('--original_args', dest='original_args', help="The original arguments.") oparser.add_argument('--current_job_kwargs', dest='current_job_kwargs', nargs='?', const=None, help="The current job arguments.") + oparser.add_argument('--args_file', dest='args_file', help="The file with arguments") return oparser diff --git a/workflow/lib/idds/iworkflow/base.py b/workflow/lib/idds/iworkflow/base.py index a478ea70..a2abf900 100644 --- a/workflow/lib/idds/iworkflow/base.py +++ b/workflow/lib/idds/iworkflow/base.py @@ -12,10 +12,12 @@ import base64 import logging import inspect +import json import os import pickle import traceback import uuid +import zlib from typing import Any, Dict, List, Optional, Tuple, Union # noqa F401 @@ -87,15 +89,15 @@ def get_func_name_and_args(self, func_name = func if args: - args = base64.b64encode(pickle.dumps(args)).decode("utf-8") + args = base64.b64encode(zlib.compress(pickle.dumps(args))).decode("utf-8") if pre_kwargs: - pre_kwargs = base64.b64encode(pickle.dumps(pre_kwargs)).decode("utf-8") + pre_kwargs = base64.b64encode(zlib.compress(pickle.dumps(pre_kwargs))).decode("utf-8") if kwargs: - kwargs = base64.b64encode(pickle.dumps(kwargs)).decode("utf-8") + kwargs = base64.b64encode(zlib.compress(pickle.dumps(kwargs))).decode("utf-8") if multi_jobs_kwargs_list: - multi_jobs_kwargs_list = [base64.b64encode(pickle.dumps(k)).decode("utf-8") for k in multi_jobs_kwargs_list] + multi_jobs_kwargs_list = [base64.b64encode(zlib.compress(pickle.dumps(k))).decode("utf-8") for k in multi_jobs_kwargs_list] - return func_call, (func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) + return func_call, (func_name, pre_kwargs, args, kwargs), multi_jobs_kwargs_list @property def logger(self): @@ -123,6 +125,32 @@ def get_output_collections(self): def get_log_collections(self): return [] + def save_context(self, source_dir, name, context): + if source_dir and name and context: + try: + file_name = name + ".json" + file_name = os.path.join(source_dir, file_name) + with open(file_name, 'w') as f: + json.dump(context, f) + self.logger.info(f"Saved context to file {file_name}") + except Exception as ex: + self.logger.error(f"Failed to save context to file {file_name}: {ex}") + + def load_context(self, source_dir, name): + if source_dir and name: + try: + context = None + file_name = name + ".json" + file_name = os.path.join(source_dir, file_name) + if os.path.exists(file_name): + with open(file_name, 'r') as f: + context = json.load(f) + self.logger.info(f"Loading context from file {file_name}") + return context + except Exception as ex: + self.logger.error(f"Failed to load context from file: {ex}") + return [] + def prepare(self): """ Prepare the workflow: upload the source files to server. @@ -160,7 +188,7 @@ def setup(self): """ return None - def load(self, func_name): + def load_func(self, func_name): """ Load the function from the source files. @@ -218,3 +246,9 @@ def setup(self): :returns command: `str` to setup the workflow. """ return None + + +class CollectionBase(DictBase): + def __init__(self): + super(CollectionBase, self).__init__() + pass diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 48404ed3..ef4401ef 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -19,6 +19,7 @@ import pickle import time import traceback +import zlib from idds.common import exceptions from idds.common.constants import WorkflowType, TransformStatus, AsyncResultStatus @@ -275,6 +276,11 @@ def broker_destination(self): def broker_destination(self, value): self._workflow_context.broker_destination = value + def get_source_dir(self): + if self._workflow_context: + return self._workflow_context.get_source_dir() + return None + @property def token(self): return self._workflow_context.token @@ -339,7 +345,7 @@ def setup(self): class Work(Base): def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=None, args=None, kwargs=None, multi_jobs_kwargs_list=None, - current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False): + current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False, name=None): """ Init a workflow. """ @@ -347,19 +353,23 @@ def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=No self.prepared = False # self._func = func - self._func, self._func_name_and_args = self.get_func_name_and_args(func, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) + self._func, self._func_name_and_args, self._multi_jobs_kwargs_list = self.get_func_name_and_args(func, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) self._func = None self._current_job_kwargs = current_job_kwargs if self._current_job_kwargs: - self._current_job_kwargs = base64.b64encode(pickle.dumps(self._current_job_kwargs)).decode("utf-8") + self._current_job_kwargs = base64.b64encode(zlib.compress(pickle.dumps(self._current_job_kwargs))).decode("utf-8") - self._name = self._func_name_and_args[0] - if self._name: - self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.') - if not is_unique_func_name: + if name: + self._name = name + else: + self._name = self._func_name_and_args[0] if self._name: - self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S") + self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.') + self._name = self._name.replace("/", "_").replace(".", "_").replace(":", "_") + if not is_unique_func_name: + if self._name: + self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S") if context: self._context = context @@ -373,6 +383,8 @@ def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=No self._async_result_initialized = False self._async_result_status = None + self._other_attributes = {} + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -569,7 +581,7 @@ def token(self, value): @property def multi_jobs_kwargs_list(self): - return self._func_name_and_args[4] + return self._multi_jobs_kwargs_list @multi_jobs_kwargs_list.setter def multi_jobs_kwargs_list(self, value): @@ -584,6 +596,10 @@ def get_work_type(self): def get_work_name(self): return self._name + def add_other_attributes(self, other_attributes): + for k, v in other_attributes.items(): + self._other_attributes[k] = v + def to_dict(self): func = self._func self._func = None @@ -591,6 +607,28 @@ def to_dict(self): self._func = func return obj + def store(self): + if self._context: + content = {'type': 'work', + 'name': self.name, + 'context': self._context, + 'original_args': self._func_name_and_args, + 'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list, + 'current_job_kwargs': self._current_job_kwargs} + source_dir = self._context.get_source_dir() + self.save_context(source_dir, self._name, content) + + def load(self, source_dir=None): + if not source_dir: + source_dir = self._context.get_source_dir() + if not source_dir: + source_dir = os.getcwd() + ret = self.load_context(source_dir, self._name) + if ret: + logging.info(f"Loaded context: {ret}") + if 'multi_jobs_kwargs_list' in ret: + self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list'] + def submit_to_idds_server(self): """ Submit the workflow to the iDDS server. @@ -800,7 +838,7 @@ def get_func_name(self): def get_multi_jobs_kwargs_list(self): multi_jobs_kwargs_list = self.multi_jobs_kwargs_list - multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] + multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list] return multi_jobs_kwargs_list def init_async_result(self): @@ -877,14 +915,14 @@ def setup(self): """ return self._context.setup() - def load(self, func_name): + def load_func(self, func_name): """ Load the function from the source files. :raise Exception """ with modified_environ(IDDS_IGNORE_WORK_DECORATOR='true'): - func = super(Work, self).load(func_name) + func = super(Work, self).load_func(func_name) return func @@ -917,22 +955,23 @@ def run(self): """ self.pre_run() - func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args + func_name, pre_kwargs, args, kwargs = self._func_name_and_args + multi_jobs_kwargs_list = self.multi_jobs_kwargs_list current_job_kwargs = self._current_job_kwargs if args: - args = pickle.loads(base64.b64decode(args)) + args = pickle.loads(zlib.decompress(base64.b64decode(args))) if pre_kwargs: - pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs)) + pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs))) if kwargs: - kwargs = pickle.loads(base64.b64decode(kwargs)) + kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs))) if multi_jobs_kwargs_list: - multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] + multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list] if self._current_job_kwargs: - current_job_kwargs = pickle.loads(base64.b64decode(current_job_kwargs)) + current_job_kwargs = pickle.loads(zlib.decompress(base64.b64decode(current_job_kwargs))) if self._func is None: - func = self.load(func_name) + func = self.load_func(func_name) self._func = func if self._context.distributed: @@ -998,15 +1037,30 @@ def run(self): return self._results def get_run_command(self): - cmd = "run_workflow --type work " + cmd = "run_workflow --type work --name %s " % self.name cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)), encode_base64(json_dumps(self._func_name_and_args))) cmd += "--current_job_kwargs ${IN/L}" return cmd + def get_run_args_to_file_cmd(self): + args = {'type': 'work', + 'name': self.name, + 'context': self._context, + 'original_args': self._func_name_and_args, + 'current_job_kwargs': '${IN/L}'} + args_json = encode_base64(json_dumps(args)) + cmd = 'echo ' + args_json + ' > run_workflow_args; ' + return cmd + + def get_run_command_test(self): + cmd = "run_workflow.sh" + return cmd + def get_runner(self): setup = self.setup() cmd = "" + run_command = self.get_run_command() if setup: diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 3a3f6bcc..3d271212 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -19,6 +19,7 @@ import pickle import tarfile import uuid +import zlib # from types import ModuleType @@ -310,6 +311,9 @@ def broker_destination(self): def broker_destination(self, value): self._broker_destination = value + def get_source_dir(self): + return self._source_dir + @property def token(self): return self._token @@ -676,7 +680,7 @@ def prepare(self): class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, - pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, + pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None, init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None): """ Init a workflow. @@ -685,17 +689,21 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo self.prepared = False # self._func = func - self._func, self._func_name_and_args = self.get_func_name_and_args(func, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) + self._func, self._func_name_and_args, self._multi_jobs_kwargs_list = self.get_func_name_and_args(func, pre_kwargs, args, kwargs, multi_jobs_kwargs_list) self._current_job_kwargs = current_job_kwargs if self._current_job_kwargs: - self._current_job_kwargs = base64.b64encode(pickle.dumps(self._current_job_kwargs)).decode("utf-8") + self._current_job_kwargs = base64.b64encode(zlib.compress(pickle.dumps(self._current_job_kwargs))).decode("utf-8") - self._name = self._func_name_and_args[0] - if self._name: - self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.') - if not is_unique_func_name: + if name: + self._name = name + else: + self._name = self._func_name_and_args[0] if self._name: - self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S") + self._name = self._name.replace('__main__:', '').replace('.py', '').replace(':', '.') + self._name = self._name.replace("/", "_").replace(".", "_").replace(":", "_") + if not is_unique_func_name: + if self._name: + self._name = self._name + "_" + datetime.datetime.utcnow().strftime("%Y_%m_%d_%H_%M_%S") source_dir = self.get_source_dir(self._func, source_dir, source_dir_parent_level=source_dir_parent_level) workflow_type = WorkflowType.iWorkflow @@ -889,7 +897,7 @@ def get_work_name(self): @property def multi_jobs_kwargs_list(self): - return self._func_name_and_args[4] + return self._multi_jobs_kwargs_list @multi_jobs_kwargs_list.setter def multi_jobs_kwargs_list(self, value): @@ -919,6 +927,28 @@ def get_source_dir(self, func, source_dir, source_dir_parent_level=None): return source_dir return None + def store(self): + if self._context: + content = {'type': 'work', + 'name': self.name, + 'context': self._context, + 'original_args': self._func_name_and_args, + 'multi_jobs_kwargs_list': self._multi_jobs_kwargs_list, + 'current_job_kwargs': self._current_job_kwargs} + source_dir = self._context.get_source_dir() + self.save_context(source_dir, self._name, content) + + def load(self, source_dir=None): + if not source_dir: + source_dir = self._context.get_source_dir() + if not source_dir: + source_dir = os.getcwd() + ret = self.load_context(source_dir, self._name) + if ret: + logging.info(f"Loaded context: {ret}") + if 'multi_jobs_kwargs_list' in ret: + self._multi_jobs_kwargs_list = ret['multi_jobs_kwargs_list'] + def prepare(self): """ Prepare the workflow: for example uploading the source codes to cache server. @@ -1046,7 +1076,8 @@ def close(self): logging.error("Failed to close request(%s): %s" % (self._context.request_id, str(ex))) def __del__(self): - self.close() + # self.close() + pass def setup(self): """ @@ -1060,14 +1091,14 @@ def setup_source_files(self): """ return self._context.setup_source_files() - def load(self, func_name): + def load_func(self, func_name): """ Load the function from the source files. :raise Exception """ with modified_environ(IDDS_IGNORE_WORKFLOW_DECORATOR='true'): - func = super(Workflow, self).load(func_name) + func = super(Workflow, self).load_func(func_name) return func @@ -1106,18 +1137,19 @@ def run(self): if True: self.pre_run() - func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = self._func_name_and_args + func_name, pre_kwargs, args, kwargs = self._func_name_and_args + multi_jobs_kwargs_list = self.multi_jobs_kwargs_list if args: - args = pickle.loads(base64.b64decode(args)) + args = pickle.loads(zlib.decompress(base64.b64decode(args))) if pre_kwargs: - pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs)) + pre_kwargs = pickle.loads(zlib.decompress(base64.b64decode(pre_kwargs))) if kwargs: - kwargs = pickle.loads(base64.b64decode(kwargs)) + kwargs = pickle.loads(zlib.decompress(base64.b64decode(kwargs))) if multi_jobs_kwargs_list: - multi_jobs_kwargs_list = [pickle.loads(base64.b64decode(k)) for k in multi_jobs_kwargs_list] + multi_jobs_kwargs_list = [pickle.loads(zlib.decompress(base64.b64decode(k))) for k in multi_jobs_kwargs_list] if self._func is None: - func = self.load(func_name) + func = self.load_func(func_name) self._func = func ret = self.run_func(self._func, pre_kwargs, args, kwargs) @@ -1136,7 +1168,7 @@ def __exit__(self, _type, _value, _tb): # /Context Manager ---------------------------------------------- def get_run_command(self): - cmd = "run_workflow --type workflow " + cmd = "run_workflow --type workflow --name %s " % self.name cmd += "--context %s --original_args %s " % (encode_base64(json_dumps(self._context)), encode_base64(json_dumps(self._func_name_and_args))) cmd += "--current_job_kwargs ${IN/L}" @@ -1184,6 +1216,10 @@ def wrapper(*args, **kwargs): f.site = site f.cloud = cloud + logging.info("return_workflow %s" % return_workflow) + if return_workflow: + return f + logging.info("Prepare workflow") f.prepare() logging.info("Prepared workflow") @@ -1191,10 +1227,6 @@ def wrapper(*args, **kwargs): logging.info("Registering workflow") f.submit() - logging.info("return_workflow %s" % return_workflow) - if return_workflow: - return f - if not local: logging.info("Run workflow at remote sites") return f From aff705c191cbe768e7995a234fb0c1c9276efc9f Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 7 Jun 2024 11:47:24 +0200 Subject: [PATCH 3/3] improve iworkflow to handle long arguments --- workflow/tools/make/zipheader_for_file | 106 +++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 workflow/tools/make/zipheader_for_file diff --git a/workflow/tools/make/zipheader_for_file b/workflow/tools/make/zipheader_for_file new file mode 100644 index 00000000..f7ec5a29 --- /dev/null +++ b/workflow/tools/make/zipheader_for_file @@ -0,0 +1,106 @@ +#!/bin/bash + +pwd +ls + +current_dir=$PWD +cmdfile="run_workflow.sh" + +echo ${current_dir}/$cmdfile + +cat <<- EOF > ${current_dir}/$cmdfile +#/bin/bash + +current_dir=\$PWD +echo "current dir: " \${current_dir} + +chmod +x \${current_dir}/bin/* +ln -fs \${current_dir}/bin/* \${current_dir}/ + +export PATH=\${current_dir}:\${current_dir}/tmp_bin:\${current_dir}/bin:\$PATH +export PYTHONPATH=\${current_dir}:\${current_dir}/lib_py:\$PYTHONPATH +export IDDS_CONFIG=\${current_dir}/etc/idds/idds.cfg.client.template + +if [[ ! -z "\${PANDA_AUTH_DIR}" ]] && [[ ! -z "\${PANDA_AUTH_ORIGIN}" ]]; then + export PANDA_AUTH_ID_TOKEN=\$(cat \$PANDA_AUTH_DIR); + export PANDA_AUTH_VO=\$PANDA_AUTH_ORIGIN; + export IDDS_OIDC_TOKEN=\$(cat \$PANDA_AUTH_DIR); + export IDDS_VO=\$PANDA_AUTH_ORIGIN; + export PANDA_AUTH=oidc; +else + unset PANDA_AUTH; + export IDDS_AUTH_TYPE=x509_proxy; + if [[ -f \$X509_USER_PROXY ]]; then + cp \$X509_USER_PROXY \${current_dir}/x509_proxy + fi +fi; + +export IDDS_LOG_LEVEL=debug + +export PANDA_CONFIG_ROOT=\$(pwd); +export PANDA_VERIFY_HOST=off; +export PANDA_BEHIND_REAL_LB=true; + +if ! command -v python &> /dev/null +then + echo "no python, alias python3 to python" + alias python=python3 +fi + +if [[ -f \${current_dir}/x509_proxy ]]; then + export X509_USER_PROXY=\${current_dir}/x509_proxy +fi + +myargs="\$@" +setup="" +pre_setup="" + +POSITIONAL=() +while [[ \$# -gt 0 ]]; do + key="\$1" + case \$key in + --setup) + setup="\$2" + shift + shift + ;; + --pre_setup) + pre_setup="\$2" + shift + shift + ;; + *) + POSITIONAL+=("\$1") # save it in an array for later + shift + ;; + esac +done + +set -- "\${POSITIONAL[@]}" # restore positional parameters + +echo "pre_setup: " \$pre_setup +echo "setup:" \$setup + +run_args=\$@ +echo "run_args: " \$run_args + +# echo \$run_args + +cmd="\$pre_setup \$setup \${run_args}" +echo \$cmd +eval \$cmd +ret=\$? + +echo rm -fr \${current_dir}/lib_py \${current_dir}/etc \${current_dir}/bin \${current_dir}/tmp_bin \${current_dir}/run_workflow_wrapper \${current_dir}/__pycache__ +rm -fr \${current_dir}/lib_py \${current_dir}/etc \${current_dir}/bin \${current_dir}/tmp_bin \${current_dir}/run_workflow_wrapper \${current_dir}/__pycache__ + +echo "return code: " \$ret +exit \$ret + +EOF + +chmod +x ${current_dir}/$cmdfile + +echo pwd +pwd; ls +exit 0