diff --git a/main/lib/idds/agents/carrier/plugins/base.py b/main/lib/idds/agents/carrier/plugins/base.py index bf3cf862..1d559398 100644 --- a/main/lib/idds/agents/carrier/plugins/base.py +++ b/main/lib/idds/agents/carrier/plugins/base.py @@ -120,6 +120,11 @@ def get_task_params(self, work): task_param_map['reqID'] = work.request_id + if work.container_options: + if type(work.container_options) in [dict] and work.container_options.get('container_image', None): + container_image = work.container_options.get('container_image', None) + task_param_map['container_name'] = container_image + return task_param_map def submit(self, *args, **kwargs): diff --git a/main/lib/idds/agents/conductor/conductor.py b/main/lib/idds/agents/conductor/conductor.py index 3d9f9a7b..99b4485b 100644 --- a/main/lib/idds/agents/conductor/conductor.py +++ b/main/lib/idds/agents/conductor/conductor.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 import datetime import random @@ -120,12 +120,15 @@ def get_new_messages(self): destination = [MessageDestination.Outside, MessageDestination.ContentExt, MessageDestination.AsyncResult] messages = core_messages.retrieve_messages(status=MessageStatus.New, min_request_id=BaseAgent.min_request_id, + delay=60, + record_fetched=True, bulk_size=self.retrieve_bulk_size, destination=destination) # self.logger.debug("Main thread get %s new messages" % len(messages)) if messages: self.logger.info("Main thread get %s new messages" % len(messages)) + return messages def get_retry_messages(self): @@ -140,9 +143,12 @@ def get_retry_messages(self): retry_messages = [] destination = [MessageDestination.Outside, MessageDestination.ContentExt, MessageDestination.AsyncResult] - messages_d = core_messages.retrieve_messages(status=MessageStatus.Delivered, + messages_d = core_messages.retrieve_messages(status=[MessageStatus.Delivered, MessageStatus.Fetched], min_request_id=BaseAgent.min_request_id, use_poll_period=True, + delay=120, + record_fetched=True, + record_fetched_status=MessageStatus.Delivered, bulk_size=self.retrieve_bulk_size, destination=destination) # msg_type=msg_type) if messages_d: diff --git a/main/lib/idds/core/messages.py b/main/lib/idds/core/messages.py index 78f292c6..96ee78fb 100644 --- a/main/lib/idds/core/messages.py +++ b/main/lib/idds/core/messages.py @@ -13,6 +13,7 @@ operations related to Messages. """ +import datetime import threading from idds.common.constants import MessageDestination, MessageType, MessageStatus @@ -46,11 +47,13 @@ def add_messages(messages, bulk_size=1000, session=None): return orm_messages.add_messages(messages, bulk_size=bulk_size, session=session) -@read_session +@transactional_session def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None, source=None, request_id=None, workload_id=None, transform_id=None, - processing_id=None, use_poll_period=False, retries=None, delay=None, - min_request_id=None, fetching_id=None, internal_id=None, session=None): + processing_id=None, use_poll_period=False, retries=None, delay=60, + min_request_id=None, fetching_id=None, internal_id=None, + record_fetched=False, record_fetched_status=MessageStatus.Fetched, + session=None): """ Retrieve up to $bulk messages. @@ -66,13 +69,24 @@ def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=No hb_thread = threading.current_thread() fetching_id = hb_thread.ident - return orm_messages.retrieve_messages(bulk_size=bulk_size, msg_type=msg_type, - status=status, source=source, destination=destination, - request_id=request_id, workload_id=workload_id, - transform_id=transform_id, processing_id=processing_id, - retries=retries, delay=delay, fetching_id=fetching_id, - min_request_id=min_request_id, internal_id=internal_id, - use_poll_period=use_poll_period, session=session) + messages = orm_messages.retrieve_messages(bulk_size=bulk_size, msg_type=msg_type, + status=status, source=source, destination=destination, + request_id=request_id, workload_id=workload_id, + transform_id=transform_id, processing_id=processing_id, + retries=retries, delay=delay, fetching_id=fetching_id, + min_request_id=min_request_id, internal_id=internal_id, + use_poll_period=use_poll_period, session=session) + if record_fetched: + to_updates = [] + for msg in messages: + to_update = {'msg_id': msg['msg_id'], + 'request_id': msg['request_id'], + 'poll_period': datetime.timedelta(seconds=delay), + 'status': record_fetched_status} + to_updates.append(to_update) + if to_updates: + orm_messages.update_messages(to_updates, min_request_id=min_request_id, session=session) + return messages @read_session diff --git a/main/lib/idds/orm/messages.py b/main/lib/idds/orm/messages.py index dca4566b..6a5d343e 100644 --- a/main/lib/idds/orm/messages.py +++ b/main/lib/idds/orm/messages.py @@ -24,7 +24,7 @@ from idds.common.constants import MessageDestination from idds.common.utils import group_list from idds.orm.base import models -from idds.orm.base.session import read_session, transactional_session +from idds.orm.base.session import transactional_session @transactional_session @@ -133,7 +133,7 @@ def update_messages(messages, bulk_size=1000, use_bulk_update_mappings=False, re raise exceptions.DatabaseException('Could not persist message: %s' % str(e)) -@read_session +@transactional_session def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, destination=None, request_id=None, workload_id=None, transform_id=None, processing_id=None, fetching_id=None, @@ -162,13 +162,18 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, msg_type = [msg_type] if len(msg_type) == 1: msg_type = [msg_type[0], msg_type[0]] + if status is not None: + if not isinstance(status, (list, tuple)): + status = [status] + if len(status) == 1: + status = [status[0], status[0]] query = session.query(models.Message) if msg_type is not None: query = query.filter(models.Message.msg_type.in_(msg_type)) if status is not None: - query = query.filter_by(status=status) + query = query.filter(models.Message.status.in_(status)) if source is not None: query = query.filter_by(source=source) if destination is not None: diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index d2cde25d..9752c723 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -83,6 +83,7 @@ task_ids = [16700, 16704, 17055, 17646, 17792, 18509, 19754, 21666, 21714, 21739, 16148, 16149, 16150] task_ids = [473, 472] + [i for i in range(325, 345)] task_ids = [476, 477, 478] +task_ids = [937, 938, 940, 941] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/monitor/data/conf.js b/monitor/data/conf.js index e875986b..27dfe021 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus972.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus972.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus972.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus972.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus972.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus972.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus925.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus925.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus925.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus925.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/lib/idds/iworkflow/base.py b/workflow/lib/idds/iworkflow/base.py index b4bcb5c1..cb65fca7 100644 --- a/workflow/lib/idds/iworkflow/base.py +++ b/workflow/lib/idds/iworkflow/base.py @@ -210,12 +210,12 @@ def run_func(self, func, pre_kwargs, args, kwargs): """ try: logging.info(f"func type: {type(func)}: {str(func)}") - logging.info("pre_kwargs type: {type(pre_kwargs)}: {str(pre_kwargs)}") - logging.info("args type: {type(args)}: {str(args)}") - logging.info("kwargs type: {type(kwargs)}: {str(kwargs)}") + logging.info(f"pre_kwargs type: {type(pre_kwargs)}: {str(pre_kwargs)}") + logging.info(f"args type: {type(args)}: {str(args)}") + logging.info(f"kwargs type: {type(kwargs)}: {str(kwargs)}") kwargs_copy = copy.deepcopy(pre_kwargs) kwargs_copy.update(kwargs) - logging.info("start to run function: {str(func)}") + logging.info(f"start to run function: {str(func)}") if kwargs_copy: ret = func(*args, **kwargs_copy) else: @@ -223,7 +223,7 @@ def run_func(self, func, pre_kwargs, args, kwargs): logging.info(f"Successfully run function, ret: {ret}") return True, ret, None except Exception as ex: - logging.error("Failed to run the function: {str(ex)}") + logging.error(f"Failed to run the function: {str(ex)}") logging.error(traceback.format_exc()) return False, None, str(ex) diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 5f9a2d2c..c8cbed0b 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -34,7 +34,7 @@ class WorkContext(Context): - def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None): + def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None, container_options=None): super(WorkContext, self).__init__() self._workflow_context = workflow_context self._transform_id = None @@ -54,6 +54,7 @@ def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=N self._map_results = False self.init_env = init_env + self.container_options = container_options def get_service(self): return self._workflow_context.service @@ -315,6 +316,16 @@ def init_env(self, value): if self._init_env: self._init_env = self._init_env + " " + @property + def container_options(self): + if self._container_options: + return self._container_options + return self._workflow_context.container_options + + @container_options.setter + def container_options(self, value): + self._container_options = value + def get_idds_server(self): return self._workflow_context.get_idds_server() @@ -356,7 +367,8 @@ def get_clean_env(self): class Work(Base): def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=None, args=None, kwargs=None, multi_jobs_kwargs_list=None, - current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False, name=None): + current_job_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False, name=None, + container_options=None): """ Init a workflow. """ @@ -385,7 +397,7 @@ def __init__(self, func=None, workflow_context=None, context=None, pre_kwargs=No if context: self._context = context else: - self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env) + self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env, container_options=container_options) self._async_ret = None @@ -590,6 +602,14 @@ def enable_separate_log(self): def enable_separate_log(self, value): self._context.enable_separate_log = value + @property + def container_options(self): + return self._context.container_options + + @container_options.setter + def container_options(self, value): + self._context.container_options = value + @property def token(self): return self._context.token @@ -1017,7 +1037,6 @@ def run_local(self): self._func = func if self._context.distributed: - rets = None args_copy = copy.deepcopy(args) pre_kwargs_copy = copy.deepcopy(pre_kwargs) kwargs_copy = copy.deepcopy(kwargs) @@ -1030,15 +1049,16 @@ def run_local(self): request_id = self._context.request_id transform_id = self._context.transform_id - logging.info("publishing AsyncResult to (request_id: %s, transform_id: %s): %s" % (request_id, transform_id, rets)) + ret_log = f"(status: {ret_status}, return: {ret_output}, error: {ret_err})" + logging.info(f"publishing AsyncResult to (request_id: {request_id}, transform_id: {transform_id}): {ret_log}") async_ret = AsyncResult(self._context, name=self.get_func_name(), internal_id=self.internal_id, current_job_kwargs=current_job_kwargs) async_ret.publish(ret_output, ret_status=ret_status, ret_error=ret_err) if not self.map_results: - self._results = rets + self._results = ret_output else: self._results = MapResult() - self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=rets) + self._results.add_result(name=self.get_func_name(), args=current_job_kwargs, result=ret_output) return self._results else: if not multi_jobs_kwargs_list: @@ -1143,10 +1163,11 @@ def run_work_distributed(w): # foo = work(arg)(foo) -def work(func=None, *, workflow=None, pre_kwargs={}, name=None, return_work=False, map_results=False, lazy=False, init_env=None, no_wraps=False): +def work(func=None, *, workflow=None, pre_kwargs={}, name=None, return_work=False, map_results=False, lazy=False, init_env=None, no_wraps=False, + container_options=None): if func is None: return functools.partial(work, workflow=workflow, pre_kwargs=pre_kwargs, return_work=return_work, no_wraps=no_wraps, - name=name, map_results=map_results, lazy=lazy, init_env=init_env) + name=name, map_results=map_results, lazy=lazy, init_env=init_env, container_options=container_options) if 'IDDS_IGNORE_WORK_DECORATOR' in os.environ: return func @@ -1163,7 +1184,8 @@ def wrapper(*args, **kwargs): if workflow_context: logging.debug("setup work") w = Work(workflow_context=workflow_context, func=func, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, - name=name, multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env) + name=name, multi_jobs_kwargs_list=multi_jobs_kwargs_list, map_results=map_results, init_env=init_env, + container_options=container_options) # if distributed: if return_work: diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index a6a8d071..8edb5d07 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -57,7 +57,8 @@ def get_current_workflow(cls): class WorkflowContext(Context): def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, - max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False): + max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False, + container_options=None): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None @@ -123,6 +124,8 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo self._enable_separate_log = enable_separate_log + self._container_options = container_options + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -344,6 +347,14 @@ def broker_destination(self, value): def get_source_dir(self): return self._source_dir + @property + def container_options(self): + return self._container_options + + @container_options.setter + def container_options(self, value): + self._container_options = value + @property def token(self): return self._token @@ -741,7 +752,7 @@ class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None, init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None, - enable_separate_log=False, exclude_source_files=[], clean_env=None): + enable_separate_log=False, exclude_source_files=[], clean_env=None, container_options=None): """ Init a workflow. """ @@ -777,7 +788,7 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir, distributed=distributed, init_env=init_env, max_walltime=max_walltime, exclude_source_files=exclude_source_files, clean_env=clean_env, - enable_separate_log=enable_separate_log) + enable_separate_log=enable_separate_log, container_options=container_options) @property def service(self): @@ -960,6 +971,14 @@ def enable_separate_log(self): def enable_separate_log(self, value): self._context.enable_separate_log = value + @property + def container_options(self): + return self._context.container_options + + @container_options.setter + def container_options(self, value): + self._context.container_options = value + def get_work_tag(self): return self._context.workflow_type.name @@ -1302,12 +1321,14 @@ def get_func_name(self): # foo = workflow(arg)(foo) def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False, - source_dir_parent_level=None, exclude_source_files=[], enable_separate_log=False, clean_env=None): + source_dir_parent_level=None, exclude_source_files=[], enable_separate_log=False, clean_env=None, + container_options=None): if func is None: return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps, return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level, - exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log) + exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log, + container_options=container_options) if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ: return func @@ -1317,7 +1338,8 @@ def wrapper(*args, **kwargs): try: f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level, - exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log) + exclude_source_files=exclude_source_files, clean_env=clean_env, enable_separate_log=enable_separate_log, + container_options=container_options) f.queue = queue f.site = site diff --git a/workflow/tools/make/zipheader b/workflow/tools/make/zipheader index 92643f4f..21327383 100644 --- a/workflow/tools/make/zipheader +++ b/workflow/tools/make/zipheader @@ -60,7 +60,7 @@ echo $new_command if [[ -L "python" ]]; then unlink python; fi -echo "To run new command:" +echo "running new command, outputs:" # eval $cmd eval $new_command ret=$?