Skip to content

Commit

Permalink
Merge pull request #290 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Mar 29, 2024
2 parents f28c8f2 + 526b1da commit 23898ec
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 50 deletions.
8 changes: 4 additions & 4 deletions client/lib/idds/client/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,14 @@ def submit(self, workflow, username=None, userdn=None, use_dataset_name=False):
transform_tag = 'workflow'
priority = 0
try:
if workflow.type in [WorkflowType.iWorkflow]:
if workflow.workflow_type in [WorkflowType.iWorkflow]:
scope = 'iworkflow'
request_type = RequestType.iWorkflow
transform_tag = workflow.get_work_tag()
priority = workflow.priority
if priority is None:
priority = 0
elif workflow.type in [WorkflowType.iWorkflowLocal]:
elif workflow.workflow_type in [WorkflowType.iWorkflowLocal]:
scope = 'iworkflowLocal'
request_type = RequestType.iWorkflowLocal
transform_tag = workflow.get_work_tag()
Expand Down Expand Up @@ -507,14 +507,14 @@ def submit_work(self, request_id, work, use_dataset_name=False):
priority = 0
workload_id = None
try:
if work.type in [WorkflowType.iWork]:
if work.workflow_type in [WorkflowType.iWork]:
transform_type = TransformType.iWork
transform_tag = work.get_work_tag()
workload_id = work.workload_id
priority = work.priority
if priority is None:
priority = 0
elif work.type in [WorkflowType.iWorkflow]:
elif work.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
transform_type = TransformType.iWorkflow
transform_tag = work.get_work_tag()
workload_id = work.workload_id
Expand Down
4 changes: 2 additions & 2 deletions main/lib/idds/agents/carrier/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def __init__(self, *args, **kwargs):
pass

def get_task_params(self, work):
if work.type in [WorkflowType.iWork]:
if work.workflow_type in [WorkflowType.iWork]:
task_name = work.name + "_" + str(work.request_id) + "_" + str(work.transform_id)
elif work.type in [WorkflowType.iWorkflow]:
elif work.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
task_name = work.name + "_" + str(work.request_id)
else:
task_name = work.name
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def handle_new_itransform_real(self, transform):
self.logger.info(log_pre + "handle_new_itransform: transform_id: %s" % transform['transform_id'])

work = transform['transform_metadata']['work']
if work.type in [WorkflowType.iWork]:
if work.workflow_type in [WorkflowType.iWork]:
work.transform_id = transform['transform_id']

# create processing
Expand Down
12 changes: 6 additions & 6 deletions monitor/data/conf.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

var appConfig = {
'iddsAPI_request': "https://lxplus939.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus939.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus939.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus939.cern.ch:443/idds/monitor/null/null/false/false/true"
'iddsAPI_request': "https://lxplus9106.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus9106.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus9106.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus9106.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus9106.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus9106.cern.ch:443/idds/monitor/null/null/false/false/true"
}
8 changes: 4 additions & 4 deletions workflow/lib/idds/iworkflow/asyncresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,12 @@ def subscribe_to_messaging_brokers(self):
timeout=timeout)
conn.set_listener("messag-subscriber", listener)
conn.connect(workflow_context.broker_username, workflow_context.broker_password, wait=True)
if workflow_context.type == WorkflowType.iWorkflow:
if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
subscribe_id = 'idds-workflow_%s' % self.internal_id
# subscribe_selector = {'selector': "type = 'iworkflow' AND request_id = %s" % workflow_context.request_id}
# subscribe_selector = {'selector': "type = 'iworkflow' AND internal_id = '%s'" % self.internal_id}
subscribe_selector = {'selector': "internal_id = '%s'" % self.internal_id}
elif workflow_context.type == WorkflowType.iWork:
elif workflow_context.workflow_type == WorkflowType.iWork:
subscribe_id = 'idds-work_%s' % self.internal_id
# subscribe_selector = {'selector': "type = 'iwork' AND request_id = %s AND transform_id = %s " % (workflow_context.request_id,
# workflow_context.transform_id)}
Expand Down Expand Up @@ -341,7 +341,7 @@ def publish(self, ret, key=None):
key = "%s:%s" % (self._name, key)
self.logger.info("publish args (%s) to key: %s" % (str(self._run_group_kwarg), key))

if workflow_context.type == WorkflowType.iWorkflow:
if workflow_context.workflow_type in [WorkflowType.iWorkflow, WorkflowType.iWorkflowLocal]:
headers = {'persistent': 'true',
'type': 'iworkflow',
'internal_id': str(self.internal_id),
Expand All @@ -354,7 +354,7 @@ def publish(self, ret, key=None):
headers=headers
)
self.logger.info("publish header: %s, body: %s" % (str(headers), str(body)))
elif workflow_context.type == WorkflowType.iWork:
elif workflow_context.workflow_type == WorkflowType.iWork:
headers = {'persistent': 'true',
'type': 'iwork',
'internal_id': str(self.internal_id),
Expand Down
42 changes: 26 additions & 16 deletions workflow/lib/idds/iworkflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@

class WorkContext(Context):

def __init__(self, name=None, workflow_context=None, source_dir=None):
def __init__(self, name=None, workflow_context=None, source_dir=None, init_env=None):
super(WorkContext, self).__init__()
self._workflow_context = workflow_context
self._transform_id = None
self._processing_id = None
self._type = WorkflowType.iWork
self._workflow_type = WorkflowType.iWork

self._name = name
self._site = None
Expand All @@ -47,6 +47,8 @@ def __init__(self, name=None, workflow_context=None, source_dir=None):

self._map_results = False

self._init_env = init_env

def get_service(self):
return self._workflow_context.service

Expand Down Expand Up @@ -181,12 +183,12 @@ def userdn(self, value):
self._workflow_context.userdn = value

@property
def type(self):
return self._type
def workflow_type(self):
return self._workflow_type

@type.setter
def type(self, value):
self._type = value
@workflow_type.setter
def workflow_type(self, value):
self._workflow_type = value

@property
def lifetime(self):
Expand Down Expand Up @@ -284,6 +286,14 @@ def map_results(self):
def map_results(self, value):
self._map_results = value

@property
def init_env(self):
return self._init_env

@init_env.setter
def init_env(self, value):
self._init_env = value

def get_idds_server(self):
return self._workflow_context.get_idds_server()

Expand Down Expand Up @@ -322,7 +332,7 @@ def setup(self):
class Work(Base):

def __init__(self, func=None, workflow_context=None, context=None, args=None, kwargs=None, group_kwargs=None,
update_kwargs=None, map_results=False, source_dir=None, is_unique_func_name=False):
update_kwargs=None, map_results=False, source_dir=None, init_env=None, is_unique_func_name=False):
"""
Init a workflow.
"""
Expand All @@ -344,7 +354,7 @@ def __init__(self, func=None, workflow_context=None, context=None, args=None, kw
if context:
self._context = context
else:
self._context = WorkContext(name=self._name, workflow_context=workflow_context)
self._context = WorkContext(name=self._name, workflow_context=workflow_context, init_env=init_env)

self.map_results = map_results
self._results = None
Expand Down Expand Up @@ -501,12 +511,12 @@ def userdn(self, value):
self._context.userdn = value

@property
def type(self):
return self._context.type
def workflow_type(self):
return self._context.workflow_type

@type.setter
def type(self, value):
self._context.type = value
@workflow_type.setter
def workflow_type(self, value):
self._context.workflow_type = value

@property
def map_results(self):
Expand Down Expand Up @@ -552,10 +562,10 @@ def group_parameters(self, value):
raise Exception("Not allwed to update group parameters")

def get_work_tag(self):
return WorkflowType.iWork.name
return self._context.workflow_type.name

def get_work_type(self):
return WorkflowType.iWork
return self._context.workflow_type.name

def get_work_name(self):
return self._name
Expand Down
35 changes: 18 additions & 17 deletions workflow/lib/idds/iworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ def get_current_workflow(cls):


class WorkflowContext(Context):
def __init__(self, name=None, service='panda', source_dir=None, type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None):
def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None):
super(WorkflowContext, self).__init__()
self._service = service # panda, idds, sharefs
self._request_id = None
self._type = type
self._workflow_type = workflow_type

# self.idds_host = None
# self.idds_async_host = None
Expand All @@ -82,7 +82,7 @@ def __init__(self, name=None, service='panda', source_dir=None, type=WorkflowTyp

self._username = None
self._userdn = None
self._type = type
self._workflow_type = workflow_type
self._lifetime = 7 * 24 * 3600
self._workload_id = None
self._request_id = None
Expand Down Expand Up @@ -232,12 +232,12 @@ def userdn(self, value):
self._userdn = value

@property
def type(self):
return self._type
def workflow_type(self):
return self._workflow_type

@type.setter
def type(self, value):
self._type = value
@workflow_type.setter
def workflow_type(self, value):
self._workflow_type = value

@property
def lifetime(self):
Expand Down Expand Up @@ -685,7 +685,7 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo
if context is not None:
self._context = context
else:
self._context = WorkflowContext(name=self._name, service=service, type=workflow_type, source_dir=source_dir,
self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir,
distributed=distributed, init_env=init_env, max_walltime=max_walltime)

@property
Expand Down Expand Up @@ -823,12 +823,12 @@ def userdn(self, value):
self._context.userdn = value

@property
def type(self):
return self._context.type
def workflow_type(self):
return self._context.workflow_type

@type.setter
def type(self, value):
self._context.type = value
@workflow_type.setter
def workflow_type(self, value):
self._context.workflow_type = value

@property
def lifetime(self):
Expand Down Expand Up @@ -858,10 +858,10 @@ def token(self, value):
self._context.token = value

def get_work_tag(self):
return self._context.type.name
return self._context.workflow_type.name

def get_work_type(self):
return self._context.type
return self._context.workflow_type

def get_work_name(self):
return self._name
Expand Down Expand Up @@ -930,9 +930,10 @@ def submit_to_panda_server(self):
import pandaclient.idds_api as idds_api

idds_server = self._context.get_idds_server()
client = idds_api.get_api(idds_utils.json_dumps,
client = idds_api.get_api(dumper=idds_utils.json_dumps,
idds_host=idds_server,
compress=True,
verbose=True,
manager=True)
request_id = client.submit(self, username=None, use_dataset_name=False)

Expand Down

0 comments on commit 23898ec

Please sign in to comment.