diff --git a/atlas/lib/idds/atlas/workflow/atlasstageinwork.py b/atlas/lib/idds/atlas/workflow/atlasstageinwork.py index f64275ae..65e8aca4 100644 --- a/atlas/lib/idds/atlas/workflow/atlasstageinwork.py +++ b/atlas/lib/idds/atlas/workflow/atlasstageinwork.py @@ -293,7 +293,14 @@ def poll_rule(self, processing): # p = processing # rule_id = p['processing_metadata']['rule_id'] proc = processing['processing_metadata']['processing'] - rule_id = proc.external_id + + if proc.external_id: + rule_id = proc.external_id + elif self.rule_id: + rule_id = self.rule_id + else: + rule_id = proc.processing_metadata.get('rule_id', None) + self.logger.debug("rule_id: %s" % rule_id) replicases_status = {} if rule_id: diff --git a/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py b/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py index deb9730a..ed839b84 100644 --- a/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py +++ b/atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py @@ -293,7 +293,14 @@ def poll_rule(self, processing): # p = processing # rule_id = p['processing_metadata']['rule_id'] proc = processing['processing_metadata']['processing'] - rule_id = proc.external_id + + if proc.external_id: + rule_id = proc.external_id + elif self.rule_id: + rule_id = self.rule_id + else: + rule_id = proc.processing_metadata.get('rule_id', None) + self.logger.debug("rule_id: %s" % rule_id) replicases_status = {} if rule_id: diff --git a/main/config_default/idds.cfg b/main/config_default/idds.cfg index ec72d59d..5ca6c8fd 100755 --- a/main/config_default/idds.cfg +++ b/main/config_default/idds.cfg @@ -50,13 +50,13 @@ retrieve_bulk_size = 32 pending_time = 30 [transformer] -num_threads = 16 +num_threads = 3 poll_period = 600 new_poll_period = 60 update_poll_period = 600 poll_time_period = 60 -retrieve_bulk_size = 32 +retrieve_bulk_size = 64 poll_operation_time_period = 240 message_bulk_size = 1000 diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 37e8efed..66f4a844 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -36,7 +36,8 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries= self.set_max_workers() num_threads = self.max_number_workers - super(Submitter, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers, name=name, **kwargs) + super(Submitter, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers, + name=name, retrieve_bulk_size=retrieve_bulk_size, **kwargs) def get_new_processings(self): """ diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 5d86b550..15b3e619 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -42,7 +42,8 @@ def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_worke self.set_max_workers() num_threads = int(self.max_number_workers) - super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers, **kwargs) + super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers, + retrieve_bulk_size=retrieve_bulk_size, **kwargs) self.logger.info("num_threads: %s" % num_threads) if hasattr(self, 'trigger_max_number_workers'): diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 6ec99687..e266ac74 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -22,7 +22,8 @@ TransformType2MessageTypeMap, MessageType, MessageTypeStr, MessageStatus, MessageSource, - MessageDestination) + MessageDestination, + get_work_status_from_transform_processing_status) from idds.common.utils import setup_logging from idds.core import (transforms as core_transforms, processings as core_processings, @@ -1665,7 +1666,9 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou return update_collections, all_updates_flushed, messages -def sync_work_status(request_id, transform_id, workload_id, work, substatus=None): +def sync_work_status(request_id, transform_id, workload_id, work, substatus=None, log_prefix=""): + logger = get_logger() + input_collections = work.get_input_collections() output_collections = work.get_output_collections() log_collections = work.get_log_collections() @@ -1686,6 +1689,10 @@ def sync_work_status(request_id, transform_id, workload_id, work, substatus=None is_all_files_failed = False if is_all_collections_closed: + logger.debug(log_prefix + "has_files: %s, is_all_files_processed: %s, is_all_files_failed: %s, substatus: %s" % (has_files, + is_all_files_processed, + is_all_files_failed, + substatus)) if has_files: if is_all_files_processed: work.status = WorkStatus.Finished @@ -1695,9 +1702,10 @@ def sync_work_status(request_id, transform_id, workload_id, work, substatus=None work.status = WorkStatus.SubFinished else: if substatus: - work.status = substatus + work.status = get_work_status_from_transform_processing_status(substatus) else: work.status = WorkStatus.Failed + logger.debug(log_prefix + "work status: %s, substatus: %s" % (str(work.status), substatus)) def sync_processing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""): @@ -1724,7 +1732,7 @@ def sync_processing(processing, agent_attributes, terminate=False, abort=False, messages += msgs - sync_work_status(request_id, transform_id, workload_id, work, processing['substatus']) + sync_work_status(request_id, transform_id, workload_id, work, processing['substatus'], log_prefix) logger.info(log_prefix + "sync_processing: work status: %s" % work.get_status()) if terminate and work.is_terminated(): msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='work') diff --git a/main/lib/idds/core/catalog.py b/main/lib/idds/core/catalog.py index 9b90ef95..6407ee3f 100644 --- a/main/lib/idds/core/catalog.py +++ b/main/lib/idds/core/catalog.py @@ -512,7 +512,7 @@ def get_content_status_statistics(coll_id=None, transform_ids=None, session=None @read_session -def get_content_status_statistics_by_relation_type(transform_ids, session=None): +def get_content_status_statistics_by_relation_type(transform_ids, bulk_size=500, session=None): """ Get statistics group by status @@ -521,7 +521,20 @@ def get_content_status_statistics_by_relation_type(transform_ids, session=None): :returns: statistics group by status, as a dict. """ - return orm_contents.get_content_status_statistics_by_relation_type(transform_ids, session=session) + if transform_ids and not isinstance(transform_ids, (list, tuple)): + transform_ids = [transform_ids] + if transform_ids and len(transform_ids) == 1: + transform_ids = [transform_ids[0], transform_ids[0]] + + if transform_ids and len(transform_ids) > bulk_size: + chunks = [transform_ids[i:i + bulk_size] for i in range(0, len(transform_ids), bulk_size)] + ret = [] + for chunk in chunks: + tmp = orm_contents.get_content_status_statistics_by_relation_type(chunk, session=session) + ret += tmp + return ret + else: + return orm_contents.get_content_status_statistics_by_relation_type(transform_ids, session=session) @transactional_session diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 708ce07a..f3c87045 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -7,8 +7,8 @@ os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda' os.environ['PANDA_BEHIND_REAL_LB'] = "1" -# os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' -# os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' +os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' from pandaclient import Client # noqa E402 @@ -31,6 +31,9 @@ task_ids += [162282, 162283, 162588] task_ids = [i for i in range(163930, 164147)] task_ids = [161142, 160648] +task_ids = [165124, 165130, 165135] + [i for i in range(165143, 165149)] +task_ids = [i for i in range(251, 282)] +task_ids = [282, 322, 323, 324, 325] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index ca878bd5..cb74006d 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -15,8 +15,9 @@ from idds.client.clientmanager import ClientManager +from idds.common.constants import RequestStatus from idds.common.utils import json_dumps # noqa F401 -from idds.rest.v1.utils import convert_old_req_2_workflow_req +from idds.rest.v1.utils import convert_old_req_2_workflow_req # noqa F401 from idds.common.utils import setup_logging @@ -42,7 +43,8 @@ def migrate(): cern_k8s_dev_host = 'https://panda-idds-dev.cern.ch/idds' # noqa F841 # cm1 = ClientManager(host=atlas_host) - cm1 = ClientManager(host=doma_host) + # cm1 = ClientManager(host=doma_host) + cm1 = ClientManager(host=atlas_host) # cm1 = ClientManager(host=slac_k8s_dev_host) # reqs = cm1.get_requests(request_id=290) # old_request_id = 298163 @@ -64,6 +66,12 @@ def migrate(): # old_request_ids = [21] + old_request_ids = [551889] + + old_request_ids = [i for i in range(551911, 556618)] + old_request_ids = [i for i in range(556618, 556712)] + old_request_ids = [i for i in range(556712, 556940)] + old_request_ids = [i for i in range(556940, 557110)] # old_request_id = 1 # for old_request_id in [152]: # for old_request_id in [60]: # noqa E115 @@ -71,10 +79,10 @@ def migrate(): for old_request_id in old_request_ids: # noqa E115 # doma 183 reqs = cm1.get_requests(request_id=old_request_id, with_metadata=True) - # cm2 = ClientManager(host=dev_host) + cm2 = ClientManager(host=dev_host) # cm2 = ClientManager(host=doma_host) # cm2 = ClientManager(host=atlas_host) - cm2 = ClientManager(host=slac_k8s_dev_host) + # cm2 = ClientManager(host=slac_k8s_dev_host) # cm2 = ClientManager(host=slac_k8s_prod_host) # cm2 = ClientManager(host=cern_k8s_dev_host) # print(reqs) @@ -85,7 +93,12 @@ def migrate(): # workflow = req['request_metadata']['workflow'] # print(json_dumps(workflow, sort_keys=True, indent=4)) - req = convert_old_req_2_workflow_req(req) + # req = convert_old_req_2_workflow_req(req) + print(req['status']) + if req['status'] in [RequestStatus.Finished]: + print('request stutus: %s, skip' % str(req['status'])) + continue + workflow = req['request_metadata']['workflow'] workflow.clean_works() diff --git a/workflow/lib/idds/workflow/workflow.py b/workflow/lib/idds/workflow/workflow.py index dcbc8fb0..d8539dc1 100644 --- a/workflow/lib/idds/workflow/workflow.py +++ b/workflow/lib/idds/workflow/workflow.py @@ -1575,10 +1575,14 @@ def sync_works(self, to_cancel=False): work = self.works[k] self.log_debug("work %s is_terminated(%s:%s)" % (work.get_internal_id(), work.is_terminated(), work.get_status())) + if work.get_internal_id() not in self.current_running_works and work.get_status() in [WorkStatus.Transforming]: + self.current_running_works.append(work.get_internal_id()) + for work in [self.works[k] for k in self.new_to_run_works]: if work.transforming: self.new_to_run_works.remove(work.get_internal_id()) - self.current_running_works.append(work.get_internal_id()) + if work.get_internal_id() not in self.current_running_works: + self.current_running_works.append(work.get_internal_id()) for work in [self.works[k] for k in self.current_running_works]: if isinstance(work, Workflow): @@ -1614,6 +1618,16 @@ def sync_works(self, to_cancel=False): self.terminated_works.append(work.get_internal_id()) self.current_running_works.remove(work.get_internal_id()) + self.num_finished_works = 0 + self.num_subfinished_works = 0 + self.num_failed_works = 0 + self.num_expired_works = 0 + self.num_cancelled_works = 0 + self.num_suspended_works = 0 + + for k in self.works: + work = self.works[k] + if work.is_terminated(): if work.is_finished(): self.num_finished_works += 1 elif work.is_subfinished(): diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index 6f4f4341..a999db8f 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -1697,13 +1697,16 @@ def sync_works(self, to_cancel=False): work.get_status(), work.submitted, work.transforming)) + if work.get_internal_id() not in self.current_running_works and work.get_status() in [WorkStatus.Transforming]: + self.current_running_works.append(work.get_internal_id()) submitting_works = self.submitting_works for work in [self.works[k] for k in self.new_to_run_works]: if work.transforming: self.new_to_run_works.remove(work.get_internal_id()) submitting_works.append(work.get_internal_id()) - self.current_running_works.append(work.get_internal_id()) + if work.get_internal_id() not in self.current_running_works: + self.current_running_works.append(work.get_internal_id()) self.submitting_works = submitting_works for work in [self.works[k] for k in self.submitting_works]: self.log_info("Work %s is_submitted(%s)" % (work.get_internal_id(), work.submitted)) @@ -1746,6 +1749,16 @@ def sync_works(self, to_cancel=False): self.terminated_works.append(work.get_internal_id()) self.current_running_works.remove(work.get_internal_id()) + self.num_finished_works = 0 + self.num_subfinished_works = 0 + self.num_failed_works = 0 + self.num_expired_works = 0 + self.num_cancelled_works = 0 + self.num_suspended_works = 0 + + for k in self.works: + work = self.works[k] + if work.is_terminated(): if work.is_finished(synchronize=False): self.num_finished_works += 1 elif work.is_subfinished(synchronize=False):