Skip to content

Commit

Permalink
Merge pull request #307 from wguanicedew/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored May 23, 2024
2 parents f3ee26e + 12a7505 commit 8359179
Show file tree
Hide file tree
Showing 20 changed files with 477 additions and 105 deletions.
18 changes: 18 additions & 0 deletions common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,24 @@ class ReturnCode(IDDSEnum):
Locked = 1


class GracefulEvent(object):
def __init__(self):
self.__is_set = False

def set(self):
self.__is_set = True

def is_set(self):
return self.__is_set


class AsyncResultStatus(IDDSEnum):
Running = 0
Finished = 1
SubFinished = 2
Failed = 3


def get_work_status_from_transform_processing_status(status):
if status in [ProcessingStatus.New, TransformStatus.New]:
return WorkStatus.New
Expand Down
2 changes: 1 addition & 1 deletion common/lib/idds/common/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def import_func(name: str) -> Callable[..., Any]:
# module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]]
if module_name_bits == '__main__':
module_name_bits = filename.replace('.py', '').replace('.pyc', '')
module_name_bits = module_name_bits.replace('/', '')
module_name_bits = module_name_bits.replace('/', '.')
module_name_bits = module_name_bits.split('.')
attribute_bits = attribute_bits.split('.')
module = None
Expand Down
7 changes: 6 additions & 1 deletion common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ def setup_logging(name, stream=None, loglevel=None):
loglevel = getattr(logging, loglevel)

if stream is None:
if config_has_section('common') and config_has_option('common', 'logdir'):
if os.environ.get('IDDS_LOG_FILE', None):
idds_log_file = os.environ.get('IDDS_LOG_FILE', None)
logging.basicConfig(filename=idds_log_file,
level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
elif config_has_section('common') and config_has_option('common', 'logdir'):
logging.basicConfig(filename=os.path.join(config_get('common', 'logdir'), name),
level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
Expand Down
1 change: 1 addition & 0 deletions main/etc/idds/idds.cfg.template
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ loglevel = DEBUG
# dev: aipanda104
# doma: aipanda105-107
# idds-mon: aipanda108

[database]
#default = mysql://idds:[email protected]/idds
#default = mysql://idds:[email protected]/idds
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def process_update_processing(self, event):

pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
if not pr:
self.logger.error("Cannot find processing for event: %s" % str(event))
self.logger.warn("Cannot find processing for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
else:
log_pre = self.get_log_prefix(pr)
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def process_new_processing(self, event):
self.logger.info("process_new_processing, event: %s" % str(event))
pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
if not pr:
self.logger.error("Cannot find processing for event: %s" % str(event))
self.logger.warn("Cannot find processing for event: %s" % str(event))
else:
log_pre = self.get_log_prefix(pr)
self.logger.info(log_pre + "process_new_processing")
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def process_trigger_processing_real(self, event):
self.logger.info("process_trigger_processing, event: %s" % str(event))
pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
if not pr:
self.logger.error("Cannot find processing for event: %s" % str(event))
self.logger.warn("Cannot find processing for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
else:
log_pre = self.get_log_prefix(pr)
Expand Down
6 changes: 3 additions & 3 deletions main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ def handle_update_irequest_real(self, req, event):
failed_tfs += 1

req_status = RequestStatus.Transforming
if req['request_type'] in [RequestType.iWorkflowLocal] and total_tfs == 0:
if req['request_type'] in [RequestType.iWorkflowLocal]:
workflow = req['request_metadata'].get('workflow', None)
if workflow and req['created_at'] + datetime.timedelta(seconds=workflow.max_walltime) < datetime.datetime.utcnow():
req_status = RequestStatus.Finished
Expand Down Expand Up @@ -1369,7 +1369,7 @@ def process_abort_request(self, event):
if event:
req = self.get_request(request_id=event._request_id, locking=True)
if not req:
self.logger.error("Cannot find request for event: %s" % str(event))
self.logger.warn("Cannot find request for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
else:
log_pre = self.get_log_prefix(req)
Expand Down Expand Up @@ -1517,7 +1517,7 @@ def process_close_request(self, event):
if event:
req = self.get_request(request_id=event._request_id, locking=True)
if not req:
self.logger.error("Cannot find request for event: %s" % str(event))
self.logger.warn("Cannot find request for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
else:
log_pre = self.get_log_prefix(req)
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def process_new_transform(self, event):
tf_status = [TransformStatus.New, TransformStatus.Ready, TransformStatus.Extend]
tf = self.get_transform(transform_id=event._transform_id, status=tf_status, locking=True)
if not tf:
self.logger.error("Cannot find transform for event: %s" % str(event))
self.logger.warn("Cannot find transform for event: %s" % str(event))
else:
log_pre = self.get_log_prefix(tf)
self.logger.info(log_pre + "process_new_transform")
Expand Down
11 changes: 11 additions & 0 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
task_ids = [168747, 168761, 168763]
task_ids = [13413]
task_ids = [168859, 168861, 168862]
task_ids = [i for i in range(9021, 9222)]
task_ids = [i for i in range(169155, 169178)]
task_ids = [169182, 169183, 169184]

task_ids = [5975, 8442, 10741, 10742, 10744, 10745, 10746, 10747]
task_ids = [15507, 15516, 15520, 15526, 15534, 15535, 15539, 15679, 15715]
task_ids = [169181, 169198, 169199, 169201, 169206] + [i for i in range(169210, 169232)]
task_ids = [169236, 169237, 169238, 169239, 169240, 169241]
task_ids = [169272, 169273, 169312, 169313]
task_ids = [169307, 169308, 169309, 169310, 169311, 169312, 169313, 169314]
task_ids = [i for i in range(169359, 169363)]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down
62 changes: 61 additions & 1 deletion main/lib/idds/tests/test_domapanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
task_queue2 = 'CC-IN2P3_Rubin_Himem'
task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
task_queue4 = 'CC-IN2P3_Rubin_Merge'
# task_queue5 = 'CC-IN2P3_Rubin_IO'
task_queue5 = 'CC-IN2P3_Rubin_Extra_Himem'
elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
site = 'lancs'
task_cloud = 'EU'
Expand All @@ -55,8 +57,10 @@
task_queue1 = 'LANCS_Rubin_Medium'
task_queue2 = 'LANCS_Rubin_Himem'
task_queue3 = 'LANCS_Rubin_Extra_Himem'
task_queue3 = 'LANCS_Rubin_Himem'
# task_queue3 = 'LANCS_Rubin_Himem'
task_queue4 = 'LANCS_Rubin_Merge'
# task_queue5 = 'LANCS_Rubin_IO'
task_queue5 = 'LANCS_Rubin_Extra_Himem'
else:
site = 'slac'
# task_cloud = 'LSST'
Expand All @@ -71,6 +75,7 @@
task_queue2 = 'SLAC_Rubin_Himem'
task_queue3 = 'SLAC_Rubin_Extra_Himem'
task_queue4 = 'SLAC_Rubin_Merge'
task_queue5 = 'SLAC_Rubin_IO'
# task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
# task_queue = 'SLAC_Rubin_Merge'
# task_queue = 'SLAC_TEST'
Expand Down Expand Up @@ -187,6 +192,26 @@ def setup_workflow():
"submitted": False} for k in range(6)
]

taskN6 = PanDATask()
taskN6.step = "step6"
taskN6.name = site + "_" + taskN6.step + "_" + randStr()
taskN6.dependencies = [
{"name": "00006" + str(k),
"order_id": k,
"dependencies": [],
"submitted": False} for k in range(6)
]

taskN7 = PanDATask()
taskN7.step = "step7"
taskN7.name = site + "_" + taskN7.step + "_" + randStr()
taskN7.dependencies = [
{"name": "00007" + str(k),
"order_id": k,
"dependencies": [],
"submitted": False} for k in range(6)
]

work1 = DomaPanDAWork(executable='echo',
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
Expand Down Expand Up @@ -265,6 +290,39 @@ def setup_workflow():
"value": "log.tgz"},
task_cloud=task_cloud)

work6 = DomaPanDAWork(executable='echo',
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
log_collections=[], dependency_map=taskN6.dependencies,
task_name=taskN6.name, task_queue=task_queue5,
encode_command_line=True,
task_priority=981,
prodSourceLabel='managed',
task_log={"dataset": "PandaJob_#{pandaid}/",
"destination": "local",
"param_type": "log",
"token": "local",
"type": "template",
"value": "log.tgz"},
task_cloud=task_cloud)

work7 = DomaPanDAWork(executable='echo',
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
log_collections=[], dependency_map=taskN7.dependencies,
task_name=taskN7.name, task_queue=task_queue3,
encode_command_line=True,
task_priority=981,
core_count=2,
prodSourceLabel='managed',
task_log={"dataset": "PandaJob_#{pandaid}/",
"destination": "local",
"param_type": "log",
"token": "local",
"type": "template",
"value": "log.tgz"},
task_cloud=task_cloud)

pending_time = 12
# pending_time = None
workflow = Workflow(pending_time=pending_time)
Expand All @@ -273,6 +331,8 @@ def setup_workflow():
workflow.add_work(work3)
workflow.add_work(work4)
workflow.add_work(work5)
workflow.add_work(work6)
workflow.add_work(work7)
workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
return workflow

Expand Down
5 changes: 5 additions & 0 deletions main/lib/idds/tests/test_get_source_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ def foo(arg1, arg2):

print(inspect.signature(foo))
print(dill.dumps(foo))

foo_str = dill.dumps(foo)
foo_1 = dill.loads(foo_str)
ret = foo_1(1, 3)
print(ret)
11 changes: 8 additions & 3 deletions main/tools/env/setup_panda.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ if [ "$instance" == "k8s" ]; then
export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/
# export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/
export PANDA_CONFIG_ROOT=~/.panda/
elif [ "$instance" == "slac" ]; then
elif [ "$instance" == "usdf_dev" ]; then
export PANDA_AUTH=oidc
export PANDA_BEHIND_REAL_LB=true
export PANDA_VERIFY_HOST=off
Expand All @@ -36,11 +36,15 @@ elif [ "$instance" == "slac" ]; then
export PANDAMON_URL=https://rubin-panda-bigmon-dev.slac.stanford.edu
export PANDA_AUTH_VO=Rubin

# export PANDA_AUTH_VO=Rubin.production

export PANDACACHE_URL=$PANDA_URL_SSL
export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/

# export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/
export PANDA_CONFIG_ROOT=~/.panda/

export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_rubin_dev
elif [ "$instance" == "usdf" ]; then
export PANDA_AUTH=oidc
export PANDA_BEHIND_REAL_LB=true
Expand All @@ -49,13 +53,14 @@ elif [ "$instance" == "usdf" ]; then
export PANDA_URL=https://usdf-panda-server.slac.stanford.edu:8443/server/panda
export PANDACACHE_URL=$PANDA_URL_SSL
export PANDAMON_URL=https://usdf-panda-bigmon.slac.stanford.edu:8443/
export PANDA_AUTH_VO=Rubin:production
export PANDA_AUTH_VO=Rubin.production

export PANDACACHE_URL=$PANDA_URL_SSL
export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/

# export PANDA_CONFIG_ROOT=/afs/cern.ch/user/w/wguan/workdisk/iDDS/main/etc/panda/
export PANDA_CONFIG_ROOT=~/.panda/
export IDDS_OIDC_TOKEN_FILE=~/.idds/.token_rubin_prod
elif [ "$instance" == "new" ]; then
export PANDA_AUTH=oidc
export PANDA_URL_SSL=https://ai-idds-05.cern.ch:25443/server/panda
Expand Down Expand Up @@ -107,7 +112,7 @@ else
# export IDDS_HOST=https://aipanda104.cern.ch:443/idds

# doma
export IDDS_HOST=https://aipanda105.cern.ch:443/idds
# export IDDS_HOST=https://aipanda105.cern.ch:443/idds

# export IDDS_BROKERS=atlas-test-mb.cern.ch:61013
# export IDDS_BROKER_DESTINATION=/topic/doma.idds
Expand Down
12 changes: 6 additions & 6 deletions monitor/data/conf.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

var appConfig = {
'iddsAPI_request': "https://lxplus935.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus935.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus935.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus935.cern.ch:443/idds/monitor/null/null/false/false/true"
'iddsAPI_request': "https://lxplus954.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus954.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus954.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus954.cern.ch:443/idds/monitor/null/null/false/false/true"
}
24 changes: 15 additions & 9 deletions workflow/bin/run_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ setup_logging(__name__, stream=sys.stdout)


def get_context_args(context, original_args, current_job_kwargs):
func_name, args, kwargs, multi_jobs_kwargs_list = None, None, None, None
func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = None, None, None, None, None
if original_args:
original_args = json_loads(original_args)
func_name, args, kwargs, multi_jobs_kwargs_list = original_args
func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list = original_args

if args:
args = pickle.loads(base64.b64decode(args))
if pre_kwargs:
pre_kwargs = pickle.loads(base64.b64decode(pre_kwargs))
if kwargs:
kwargs = pickle.loads(base64.b64decode(kwargs))
if multi_jobs_kwargs_list:
Expand All @@ -65,21 +67,23 @@ def get_context_args(context, original_args, current_job_kwargs):
# kwargs.update(current_job_kwargs)
except Exception as ex:
logging.error("Failed to update kwargs: %s" % ex)
return context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs
return context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs


def run_workflow(context, original_args, current_job_kwargs):
context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
logging.info("context: %s" % context)
logging.info("func_name: %s" % func_name)
logging.info("pre_kwargs: %s" % pre_kwargs)
logging.info("args: %s" % str(args))
logging.info("kwargs: %s" % kwargs)
logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list)
logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list))
logging.info("current_job_kwargs: %s" % str(current_job_kwargs))

context.initialize()
context.setup_source_files()

workflow = Workflow(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context)
workflow = Workflow(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context)
logging.info("workflow: %s" % workflow)
with workflow:
ret = workflow.run()
Expand All @@ -88,17 +92,19 @@ def run_workflow(context, original_args, current_job_kwargs):


def run_work(context, original_args, current_job_kwargs):
context, func_name, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
context, func_name, pre_kwargs, args, kwargs, multi_jobs_kwargs_list, current_job_kwargs = get_context_args(context, original_args, current_job_kwargs)
logging.info("context: %s" % context)
logging.info("func_name: %s" % func_name)
logging.info("pre_kwargs: %s" % pre_kwargs)
logging.info("args: %s" % str(args))
logging.info("kwargs: %s" % kwargs)
logging.info("multi_jobs_kwargs_list: %s" % multi_jobs_kwargs_list)
logging.info("multi_jobs_kwargs_list: %s" % str(multi_jobs_kwargs_list))
logging.info("current_job_kwargs: %s" % str(current_job_kwargs))

context.initialize()
context.setup_source_files()

work = Work(func=func_name, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context)
work = Work(func=func_name, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, multi_jobs_kwargs_list=multi_jobs_kwargs_list, current_job_kwargs=current_job_kwargs, context=context)
logging.info("work: %s" % work)
ret = work.run()
logging.info("run work result: %s" % str(ret))
Expand Down
Loading

0 comments on commit 8359179

Please sign in to comment.