diff --git a/client/lib/idds/client/clientmanager.py b/client/lib/idds/client/clientmanager.py index 54384c35..da467ec0 100644 --- a/client/lib/idds/client/clientmanager.py +++ b/client/lib/idds/client/clientmanager.py @@ -437,14 +437,14 @@ def submit(self, workflow, username=None, userdn=None, use_dataset_name=False): transform_tag = 'workflow' priority = 0 try: - if workflow.type in [WorkflowType.iWorkflow]: + if workflow.workflow_type in [WorkflowType.iWorkflow]: scope = 'iworkflow' request_type = RequestType.iWorkflow transform_tag = workflow.get_work_tag() priority = workflow.priority if priority is None: priority = 0 - elif workflow.type in [WorkflowType.iWorkflowLocal]: + elif workflow.workflow_type in [WorkflowType.iWorkflowLocal]: scope = 'iworkflowLocal' request_type = RequestType.iWorkflowLocal transform_tag = workflow.get_work_tag() @@ -507,14 +507,14 @@ def submit_work(self, request_id, work, use_dataset_name=False): priority = 0 workload_id = None try: - if work.type in [WorkflowType.iWork]: + if work.workflow_type in [WorkflowType.iWork]: transform_type = TransformType.iWork transform_tag = work.get_work_tag() workload_id = work.workload_id priority = work.priority if priority is None: priority = 0 - elif work.type in [WorkflowType.iWorkflow]: + elif work.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: transform_type = TransformType.iWorkflow transform_tag = work.get_work_tag() workload_id = work.workload_id diff --git a/main/lib/idds/agents/carrier/plugins/base.py b/main/lib/idds/agents/carrier/plugins/base.py index cbd9ef16..039d6a97 100644 --- a/main/lib/idds/agents/carrier/plugins/base.py +++ b/main/lib/idds/agents/carrier/plugins/base.py @@ -20,9 +20,9 @@ def __init__(self, *args, **kwargs): pass def get_task_params(self, work): - if work.type in [WorkflowType.iWork]: + if work.workflow_type in [WorkflowType.iWork]: task_name = work.name + "_" + str(work.request_id) + "_" + str(work.transform_id) - elif work.type in [WorkflowType.iWorkflow]: + elif work.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: task_name = work.name + "_" + str(work.request_id) else: task_name = work.name diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index 40a02eed..62c7f4f6 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -343,7 +343,7 @@ def handle_new_itransform_real(self, transform): self.logger.info(log_pre + "handle_new_itransform: transform_id: %s" % transform['transform_id']) work = transform['transform_metadata']['work'] - if work.type in [WorkflowType.iWork]: + if work.workflow_type in [WorkflowType.iWork]: work.transform_id = transform['transform_id'] # create processing diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 18a92f16..d201da28 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus939.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus939.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus939.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus9106.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus9106.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus9106.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus9106.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus9106.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus9106.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/workflow/lib/idds/iworkflow/asyncresult.py b/workflow/lib/idds/iworkflow/asyncresult.py index c02a422c..a063192c 100644 --- a/workflow/lib/idds/iworkflow/asyncresult.py +++ b/workflow/lib/idds/iworkflow/asyncresult.py @@ -308,12 +308,12 @@ def subscribe_to_messaging_brokers(self): timeout=timeout) conn.set_listener("messag-subscriber", listener) conn.connect(workflow_context.broker_username, workflow_context.broker_password, wait=True) - if workflow_context.type == WorkflowType.iWorkflow: + if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: subscribe_id = 'idds-workflow_%s' % self.internal_id # subscribe_selector = {'selector': "type = 'iworkflow' AND request_id = %s" % workflow_context.request_id} # subscribe_selector = {'selector': "type = 'iworkflow' AND internal_id = '%s'" % self.internal_id} subscribe_selector = {'selector': "internal_id = '%s'" % self.internal_id} - elif workflow_context.type == WorkflowType.iWork: + elif workflow_context.workflow_type == WorkflowType.iWork: subscribe_id = 'idds-work_%s' % self.internal_id # subscribe_selector = {'selector': "type = 'iwork' AND request_id = %s AND transform_id = %s " % (workflow_context.request_id, # workflow_context.transform_id)} @@ -341,7 +341,7 @@ def publish(self, ret, key=None): key = "%s:%s" % (self._name, key) self.logger.info("publish args (%s) to key: %s" % (str(self._run_group_kwarg), key)) - if workflow_context.type == WorkflowType.iWorkflow: + if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]: headers = {'persistent': 'true', 'type': 'iworkflow', 'internal_id': str(self.internal_id), @@ -354,7 +354,7 @@ def publish(self, ret, key=None): headers=headers ) self.logger.info("publish header: %s, body: %s" % (str(headers), str(body))) - elif workflow_context.type == WorkflowType.iWork: + elif workflow_context.workflow_type == WorkflowType.iWork: headers = {'persistent': 'true', 'type': 'iwork', 'internal_id': str(self.internal_id), diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 0ae3e21b..e981cd0a 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -28,12 +28,12 @@ class WorkContext(Context): - def __init__(self, name=None, workflow_context=None, source_dir=None): + def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None): super(WorkContext, self).__init__() self._workflow_context = workflow_context self._transform_id = None self._processing_id = None - self._type = WorkflowType.iWork + self._workflow_type = WorkflowType.iWork self._name = name self._site = None @@ -47,6 +47,8 @@ def __init__(self, name=None, workflow_context=None, source_dir=None): self._map_results = False + self._init_env = init_env + def get_service(self): return self._workflow_context.service @@ -181,12 +183,12 @@ def userdn(self, value): self._workflow_context.userdn = value @property - def type(self): - return self._type + def workflow_type(self): + return self._workflow_type - @type.setter - def type(self, value): - self._type = value + @workflow_type.setter + def workflow_type(self, value): + self._workflow_type = value @property def lifetime(self): @@ -284,6 +286,14 @@ def map_results(self): def map_results(self, value): self._map_results = value + @property + def init_env(self): + return self._init_env + + @init_env.setter + def init_env(self, value): + self._init_env = value + def get_idds_server(self): return self._workflow_context.get_idds_server() @@ -322,7 +332,7 @@ def setup(self): class Work(Base): def __init__(self, func=None, workflow_context=None, context=None, args=None, kwargs=None, group_kwargs=None, - update_kwargs=None, map_results=False, source_dir=None, is_unique_func_name=False): + update_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False): """ Init a workflow. """ @@ -344,7 +354,7 @@ def __init__(self, func=None, workflow_context=None, context=None, args=None, kw if context: self._context = context else: - self._context = WorkContext(name=self._name, workflow_context=workflow_context) + self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env) self.map_results = map_results self._results = None @@ -501,12 +511,12 @@ def userdn(self, value): self._context.userdn = value @property - def type(self): - return self._context.type + def workflow_type(self): + return self._context.workflow_type - @type.setter - def type(self, value): - self._context.type = value + @workflow_type.setter + def workflow_type(self, value): + self._context.workflow_type = value @property def map_results(self): @@ -552,10 +562,10 @@ def group_parameters(self, value): raise Exception("Not allwed to update group parameters") def get_work_tag(self): - return WorkflowType.iWork.name + return self._context.workflow_type.name def get_work_type(self): - return WorkflowType.iWork + return self._context.workflow_type.name def get_work_name(self): return self._name diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index c92664e1..4f324ae4 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -51,11 +51,11 @@ def get_current_workflow(cls): class WorkflowContext(Context): - def __init__(self, name=None, service='panda', source_dir=None, type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None): + def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None - self._type = type + self._workflow_type = workflow_type # self.idds_host = None # self.idds_async_host = None @@ -82,7 +82,7 @@ def __init__(self, name=None, service='panda', source_dir=None, type=WorkflowTyp self._username = None self._userdn = None - self._type = type + self._workflow_type = workflow_type self._lifetime = 7 * 24 * 3600 self._workload_id = None self._request_id = None @@ -232,12 +232,12 @@ def userdn(self, value): self._userdn = value @property - def type(self): - return self._type + def workflow_type(self): + return self._workflow_type - @type.setter - def type(self, value): - self._type = value + @workflow_type.setter + def workflow_type(self, value): + self._workflow_type = value @property def lifetime(self): @@ -685,7 +685,7 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo if context is not None: self._context = context else: - self._context = WorkflowContext(name=self._name, service=service, type=workflow_type, source_dir=source_dir, + self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir, distributed=distributed, init_env=init_env, max_walltime=max_walltime) @property @@ -823,12 +823,12 @@ def userdn(self, value): self._context.userdn = value @property - def type(self): - return self._context.type + def workflow_type(self): + return self._context.workflow_type - @type.setter - def type(self, value): - self._context.type = value + @workflow_type.setter + def workflow_type(self, value): + self._context.workflow_type = value @property def lifetime(self): @@ -858,10 +858,10 @@ def token(self, value): self._context.token = value def get_work_tag(self): - return self._context.type.name + return self._context.workflow_type.name def get_work_type(self): - return self._context.type + return self._context.workflow_type def get_work_name(self): return self._name @@ -930,9 +930,10 @@ def submit_to_panda_server(self): import pandaclient.idds_api as idds_api idds_server = self._context.get_idds_server() - client = idds_api.get_api(idds_utils.json_dumps, + client = idds_api.get_api(dumper=idds_utils.json_dumps, idds_host=idds_server, compress=True, + verbose=True, manager=True) request_id = client.submit(self, username=None, use_dataset_name=False)