diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index cb85e9ad..6090cf73 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -583,3 +583,8 @@ def pid_exists(pid): raise else: return True + + +def get_list_chunks(full_list, bulk_size=2000): + chunks = [full_list[i:i + bulk_size] for i in range(0, len(full_list), bulk_size)] + return chunks diff --git a/main/config_default/idds.cfg b/main/config_default/idds.cfg index 05bb03dc..afa1db6a 100755 --- a/main/config_default/idds.cfg +++ b/main/config_default/idds.cfg @@ -38,11 +38,11 @@ coordination_interval_delay = 300 [clerk] num_threads = 16 max_number_workers = 16 -poll_period = 600 +poll_period = 300 new_poll_period = 10 -update_poll_period = 600 +update_poll_period = 300 new_command_poll_period = 10 -update_command_poll_period = 600 +update_command_poll_period = 300 poll_time_period = 60 poll_operation_time_period = 60 @@ -51,13 +51,13 @@ pending_time = 30 [transformer] num_threads = 3 -poll_period = 600 -new_poll_period = 60 -update_poll_period = 600 +poll_period = 180 +new_poll_period = 10 +update_poll_period = 180 poll_time_period = 60 retrieve_bulk_size = 64 -poll_operation_time_period = 240 +poll_operation_time_period = 180 message_bulk_size = 1000 # domapandawork.life_time = 86400 @@ -70,12 +70,12 @@ trigger_max_number_workers = 8 finisher_max_number_workers = 8 receiver_num_threads = 8 -poll_period = 600 +poll_period = 300 new_poll_period = 10 update_poll_period = 300 poll_time_period = 60 -poll_operation_time_period = 240 +poll_operation_time_period = 180 retrieve_bulk_size = 16 message_bulk_size = 1000 diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 15b3e619..1cc33643 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -33,7 +33,7 @@ class Trigger(Poller): """ def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2, - name='Trigger', message_bulk_size=1000, **kwargs): + name='Trigger', message_bulk_size=1000, max_updates_per_round=2000, **kwargs): if trigger_max_number_workers > num_threads: self.max_number_workers = trigger_max_number_workers else: @@ -46,6 +46,9 @@ def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_worke retrieve_bulk_size=retrieve_bulk_size, **kwargs) self.logger.info("num_threads: %s" % num_threads) + self.max_updates_per_round = max_updates_per_round + self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round) + if hasattr(self, 'trigger_max_number_workers'): self.max_number_workers = int(self.trigger_max_number_workers) self.number_msg_workers = 0 @@ -97,6 +100,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): ret_trigger_processing = handle_trigger_processing(processing, self.agent_attributes, trigger_new_updates=trigger_new_updates, + max_updates_per_round=self.max_updates_per_round, logger=self.logger, log_prefix=log_prefix) process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms = ret_trigger_processing diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index b1d745ae..9f4546d8 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -568,11 +568,12 @@ def get_input_output_sub_maps(inputs, outputs, inputs_dependency, logs=[]): return input_output_sub_maps -def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, logger=None, log_prefix=''): +def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, max_updates_per_round=2000, logger=None, log_prefix=''): updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], [] updated_contents_full_input_deps = [] new_update_contents = [] + chunks = [] status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed, ContentStatus.Missing, ContentStatus.Failed, ContentStatus.Lost, ContentStatus.Deleted] @@ -598,6 +599,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated 'coll_id': content['coll_id']} new_update_contents.append(u_content_substatus) updated_contents_full_input.append(content) + for content in outputs: if (content['status'] != content['substatus']) and content['substatus'] in status_to_check: u_content = {'content_id': content['content_id'], @@ -611,6 +613,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated 'coll_id': content['coll_id']} new_update_contents.append(u_content_substatus) updated_contents_full_output.append(content) + for content in inputs_dependency: if (content['status'] != content['substatus']) and content['substatus'] in status_to_check: u_content = {'content_id': content['content_id'], @@ -669,7 +672,21 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated 'workload_id': content['workload_id'], 'coll_id': content['coll_id']} new_update_contents.append(u_content_substatus) - return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents + + if len(updated_contents) > max_updates_per_round: + chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents + chunks.append(chunk) + + updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], [] + updated_contents_full_input_deps = [] + new_update_contents = [] + + if len(updated_contents) > 0: + chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents + chunks.append(chunk) + + # return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents + return chunks def get_transform_dependency_map(transform_id, logger=None, log_prefix=''): @@ -1098,12 +1115,13 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, return update_transforms -def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, logger=None, log_prefix=''): +def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) ret_msgs = [] content_updates = [] ret_update_transforms = [] + new_update_contents = [] request_id = processing['request_id'] transform_id = processing['transform_id'] @@ -1134,7 +1152,9 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= con_dict['content_metadata'] = con['content_metadata'] new_contents_update_list.append(con_dict) # contents_id_list.append(con['content_id']) - core_catalog.update_contents(new_contents_update_list) + new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)] + for chunk in new_contents_update_list_chunks: + core_catalog.update_contents(chunk) # core_catalog.delete_contents_update(contents=contents_id_list) core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True) logger.debug(log_prefix + "sync contents_update to contents done") @@ -1142,7 +1162,9 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= logger.debug(log_prefix + "update_contents_from_others_by_dep_id") # core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) - core_catalog.update_contents(to_triggered_contents) + to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)] + for chunk in to_triggered_contents_chunks: + core_catalog.update_contents(chunk) logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") input_output_maps = get_input_output_maps(transform_id, work) @@ -1153,25 +1175,43 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= ProcessingStatus.Terminating, ProcessingStatus.Cancelled] if processing['status'] in terminated_status or processing['substatus'] in terminated_status: terminated_processing = True - updated_contents_ret = get_updated_contents_by_input_output_maps(input_output_maps=input_output_maps, terminated=terminated_processing, logger=logger, log_prefix=log_prefix) - - updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret - logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] %s" % (updated_contents[:3])) - - if updated_contents_full_input: - # if the content is updated by receiver, here is the place to broadcast the messages - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=updated_contents_full_input, relation_type='input') - ret_msgs = ret_msgs + msgs - if updated_contents_full_output: - # if the content is updated by receiver, here is the place to broadcast the messages - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=updated_contents_full_output, relation_type='output') - ret_msgs = ret_msgs + msgs - - content_updates = content_updates + updated_contents - if content_updates or new_update_contents: + updated_contents_ret_chunks = get_updated_contents_by_input_output_maps(input_output_maps=input_output_maps, + terminated=terminated_processing, + max_updates_per_round=max_updates_per_round, + logger=logger, + log_prefix=log_prefix) + + has_updates = False + for updated_contents_ret in updated_contents_ret_chunks: + updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret + logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] %s" % (updated_contents[:3])) + + if updated_contents_full_input: + # if the content is updated by receiver, here is the place to broadcast the messages + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=updated_contents_full_input, relation_type='input') + ret_msgs = ret_msgs + msgs + if updated_contents_full_output: + # if the content is updated by receiver, here is the place to broadcast the messages + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=updated_contents_full_output, relation_type='output') + ret_msgs = ret_msgs + msgs + + # content_updates = content_updates + updated_contents + + if updated_contents or new_update_contents: + has_updates = True + + core_processings.update_processing_contents(update_processing=None, + update_contents=updated_contents, + new_update_contents=new_update_contents, + messages=ret_msgs) + updated_contents = [] + new_update_contents = [] + ret_msgs = [] + + if has_updates: ret_update_transforms = get_updated_transforms_by_content_status(request_id=request_id, transform_id=transform_id, logger=logger, diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index 2b08ef53..b336f48e 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -17,6 +17,7 @@ from idds.orm.base.session import read_session, transactional_session from idds.common.constants import ProcessingLocking, ProcessingStatus, GranularityType, ContentRelationType +from idds.common.utils import get_list_chunks from idds.orm import (processings as orm_processings, collections as orm_collections, contents as orm_contents, @@ -327,35 +328,51 @@ def update_processing_contents(update_processing, update_contents, update_messag if update_collections: orm_collections.update_collections(update_collections, session=session) if update_contents: - orm_contents.update_contents(update_contents, session=session) + chunks = get_list_chunks(update_contents) + for chunk in chunks: + orm_contents.update_contents(chunk, session=session) if new_update_contents: # first add and then delete, to trigger the trigger 'update_content_dep_status'. # too slow - orm_contents.add_contents_update(new_update_contents, session=session) + chunks = get_list_chunks(new_update_contents) + for chunk in chunks: + orm_contents.add_contents_update(chunk, session=session) # orm_contents.delete_contents_update(session=session) pass if new_contents: - orm_contents.add_contents(new_contents, session=session) + chunks = get_list_chunks(new_contents) + for chunk in chunks: + orm_contents.add_contents(chunk, session=session) if new_contents_ext: - orm_contents.add_contents_ext(new_contents_ext, session=session) + chunks = get_list_chunks(new_contents_ext) + for chunk in chunks: + orm_contents.add_contents_ext(chunk, session=session) if update_contents_ext: - orm_contents.update_contents_ext(update_contents_ext, session=session) + chunks = get_list_chunks(update_contents_ext) + for chunk in chunks: + orm_contents.update_contents_ext(chunk, session=session) if new_input_dependency_contents: new_input_dependency_contents = resolve_input_dependency_id(new_input_dependency_contents, session=session) - orm_contents.add_contents(new_input_dependency_contents, session=session) + chunks = get_list_chunks(new_input_dependency_contents) + for chunk in chunks: + orm_contents.add_contents(chunk, session=session) if update_dep_contents: request_id, update_dep_contents_status_name, update_dep_contents_status = update_dep_contents for status_name in update_dep_contents_status_name: status = update_dep_contents_status_name[status_name] status_content_ids = update_dep_contents_status[status_name] if status_content_ids: - orm_contents.update_dep_contents(request_id, status_content_ids, status, session=session) + chunks = get_list_chunks(status_content_ids) + for chunk in chunks: + orm_contents.update_dep_contents(request_id, chunk, status, session=session) if update_processing: orm_processings.update_processing(processing_id=update_processing['processing_id'], parameters=update_processing['parameters'], session=session) if update_messages: - orm_messages.update_messages(update_messages, bulk_size=message_bulk_size, session=session) + chunks = get_list_chunks(update_messages) + for chunk in chunks: + orm_messages.update_messages(chunk, bulk_size=message_bulk_size, session=session) if messages: if not type(messages) in [list, tuple]: messages = [messages] diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index a1a9c4ae..4beebdf3 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -75,7 +75,7 @@ # task_queue = 'SLAC_Rubin_Merge' # task_queue = 'SLAC_TEST' -task_cloud = None +# task_cloud = None def randStr(chars=string.ascii_lowercase + string.digits, N=10): diff --git a/main/lib/idds/tests/test_domapanda_big.py b/main/lib/idds/tests/test_domapanda_big.py index 84102013..a576af78 100644 --- a/main/lib/idds/tests/test_domapanda_big.py +++ b/main/lib/idds/tests/test_domapanda_big.py @@ -173,7 +173,7 @@ def setup_workflow(): taskN4.dependencies = [ {"name": "00004" + str(k), "dependencies": [], - "submitted": False} for k in range(1000) + "submitted": False} for k in range(10000) ] taskN5 = PanDATask() @@ -182,7 +182,7 @@ def setup_workflow(): taskN5.dependencies = [ {"name": "00005" + str(k), "dependencies": [], - "submitted": False} for k in range(1000) + "submitted": False} for k in range(10000) ] work1 = DomaPanDAWork(executable='echo; sleep 180',