Skip to content

Commit

Permalink
Merge pull request #229 from HSF/dev
Browse files Browse the repository at this point in the history
bulk poller and trigger operations
  • Loading branch information
wguanicedew authored Oct 19, 2023
2 parents 83f35b2 + 06b58da commit e0ad9b5
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 44 deletions.
5 changes: 5 additions & 0 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,3 +583,8 @@ def pid_exists(pid):
raise
else:
return True


def get_list_chunks(full_list, bulk_size=2000):
chunks = [full_list[i:i + bulk_size] for i in range(0, len(full_list), bulk_size)]
return chunks
18 changes: 9 additions & 9 deletions main/config_default/idds.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ coordination_interval_delay = 300
[clerk]
num_threads = 16
max_number_workers = 16
poll_period = 600
poll_period = 300
new_poll_period = 10
update_poll_period = 600
update_poll_period = 300
new_command_poll_period = 10
update_command_poll_period = 600
update_command_poll_period = 300

poll_time_period = 60
poll_operation_time_period = 60
Expand All @@ -51,13 +51,13 @@ pending_time = 30

[transformer]
num_threads = 3
poll_period = 600
new_poll_period = 60
update_poll_period = 600
poll_period = 180
new_poll_period = 10
update_poll_period = 180

poll_time_period = 60
retrieve_bulk_size = 64
poll_operation_time_period = 240
poll_operation_time_period = 180
message_bulk_size = 1000

# domapandawork.life_time = 86400
Expand All @@ -70,12 +70,12 @@ trigger_max_number_workers = 8
finisher_max_number_workers = 8
receiver_num_threads = 8

poll_period = 600
poll_period = 300
new_poll_period = 10
update_poll_period = 300

poll_time_period = 60
poll_operation_time_period = 240
poll_operation_time_period = 180
retrieve_bulk_size = 16
message_bulk_size = 1000

Expand Down
6 changes: 5 additions & 1 deletion main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Trigger(Poller):
"""

def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
name='Trigger', message_bulk_size=1000, **kwargs):
name='Trigger', message_bulk_size=1000, max_updates_per_round=2000, **kwargs):
if trigger_max_number_workers > num_threads:
self.max_number_workers = trigger_max_number_workers
else:
Expand All @@ -46,6 +46,9 @@ def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_worke
retrieve_bulk_size=retrieve_bulk_size, **kwargs)
self.logger.info("num_threads: %s" % num_threads)

self.max_updates_per_round = max_updates_per_round
self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round)

if hasattr(self, 'trigger_max_number_workers'):
self.max_number_workers = int(self.trigger_max_number_workers)
self.number_msg_workers = 0
Expand Down Expand Up @@ -97,6 +100,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False):
ret_trigger_processing = handle_trigger_processing(processing,
self.agent_attributes,
trigger_new_updates=trigger_new_updates,
max_updates_per_round=self.max_updates_per_round,
logger=self.logger,
log_prefix=log_prefix)
process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms = ret_trigger_processing
Expand Down
86 changes: 63 additions & 23 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,12 @@ def get_input_output_sub_maps(inputs, outputs, inputs_dependency, logs=[]):
return input_output_sub_maps


def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, logger=None, log_prefix=''):
def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, max_updates_per_round=2000, logger=None, log_prefix=''):
updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
updated_contents_full_input_deps = []
new_update_contents = []

chunks = []
status_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable, ContentStatus.FinalFailed,
ContentStatus.Missing, ContentStatus.Failed, ContentStatus.Lost,
ContentStatus.Deleted]
Expand All @@ -598,6 +599,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated
'coll_id': content['coll_id']}
new_update_contents.append(u_content_substatus)
updated_contents_full_input.append(content)

for content in outputs:
if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
u_content = {'content_id': content['content_id'],
Expand All @@ -611,6 +613,7 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated
'coll_id': content['coll_id']}
new_update_contents.append(u_content_substatus)
updated_contents_full_output.append(content)

for content in inputs_dependency:
if (content['status'] != content['substatus']) and content['substatus'] in status_to_check:
u_content = {'content_id': content['content_id'],
Expand Down Expand Up @@ -669,7 +672,21 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated
'workload_id': content['workload_id'],
'coll_id': content['coll_id']}
new_update_contents.append(u_content_substatus)
return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents

if len(updated_contents) > max_updates_per_round:
chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
chunks.append(chunk)

updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
updated_contents_full_input_deps = []
new_update_contents = []

if len(updated_contents) > 0:
chunk = updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
chunks.append(chunk)

# return updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents
return chunks


def get_transform_dependency_map(transform_id, logger=None, log_prefix=''):
Expand Down Expand Up @@ -1098,12 +1115,13 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None,
return update_transforms


def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, logger=None, log_prefix=''):
def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, logger=None, log_prefix=''):
logger = get_logger(logger)

ret_msgs = []
content_updates = []
ret_update_transforms = []
new_update_contents = []

request_id = processing['request_id']
transform_id = processing['transform_id']
Expand Down Expand Up @@ -1134,15 +1152,19 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=
con_dict['content_metadata'] = con['content_metadata']
new_contents_update_list.append(con_dict)
# contents_id_list.append(con['content_id'])
core_catalog.update_contents(new_contents_update_list)
new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)]
for chunk in new_contents_update_list_chunks:
core_catalog.update_contents(chunk)
# core_catalog.delete_contents_update(contents=contents_id_list)
core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
logger.debug(log_prefix + "sync contents_update to contents done")

logger.debug(log_prefix + "update_contents_from_others_by_dep_id")
# core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
core_catalog.update_contents(to_triggered_contents)
to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)]
for chunk in to_triggered_contents_chunks:
core_catalog.update_contents(chunk)
logger.debug(log_prefix + "update_contents_from_others_by_dep_id done")

input_output_maps = get_input_output_maps(transform_id, work)
Expand All @@ -1153,25 +1175,43 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=
ProcessingStatus.Terminating, ProcessingStatus.Cancelled]
if processing['status'] in terminated_status or processing['substatus'] in terminated_status:
terminated_processing = True
updated_contents_ret = get_updated_contents_by_input_output_maps(input_output_maps=input_output_maps, terminated=terminated_processing, logger=logger, log_prefix=log_prefix)

updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret
logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] %s" % (updated_contents[:3]))

if updated_contents_full_input:
# if the content is updated by receiver, here is the place to broadcast the messages
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
files=updated_contents_full_input, relation_type='input')
ret_msgs = ret_msgs + msgs
if updated_contents_full_output:
# if the content is updated by receiver, here is the place to broadcast the messages
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
files=updated_contents_full_output, relation_type='output')
ret_msgs = ret_msgs + msgs

content_updates = content_updates + updated_contents

if content_updates or new_update_contents:
updated_contents_ret_chunks = get_updated_contents_by_input_output_maps(input_output_maps=input_output_maps,
terminated=terminated_processing,
max_updates_per_round=max_updates_per_round,
logger=logger,
log_prefix=log_prefix)

has_updates = False
for updated_contents_ret in updated_contents_ret_chunks:
updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret
logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] %s" % (updated_contents[:3]))

if updated_contents_full_input:
# if the content is updated by receiver, here is the place to broadcast the messages
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
files=updated_contents_full_input, relation_type='input')
ret_msgs = ret_msgs + msgs
if updated_contents_full_output:
# if the content is updated by receiver, here is the place to broadcast the messages
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file',
files=updated_contents_full_output, relation_type='output')
ret_msgs = ret_msgs + msgs

# content_updates = content_updates + updated_contents

if updated_contents or new_update_contents:
has_updates = True

core_processings.update_processing_contents(update_processing=None,
update_contents=updated_contents,
new_update_contents=new_update_contents,
messages=ret_msgs)
updated_contents = []
new_update_contents = []
ret_msgs = []

if has_updates:
ret_update_transforms = get_updated_transforms_by_content_status(request_id=request_id,
transform_id=transform_id,
logger=logger,
Expand Down
33 changes: 25 additions & 8 deletions main/lib/idds/core/processings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from idds.orm.base.session import read_session, transactional_session
from idds.common.constants import ProcessingLocking, ProcessingStatus, GranularityType, ContentRelationType
from idds.common.utils import get_list_chunks
from idds.orm import (processings as orm_processings,
collections as orm_collections,
contents as orm_contents,
Expand Down Expand Up @@ -327,35 +328,51 @@ def update_processing_contents(update_processing, update_contents, update_messag
if update_collections:
orm_collections.update_collections(update_collections, session=session)
if update_contents:
orm_contents.update_contents(update_contents, session=session)
chunks = get_list_chunks(update_contents)
for chunk in chunks:
orm_contents.update_contents(chunk, session=session)
if new_update_contents:
# first add and then delete, to trigger the trigger 'update_content_dep_status'.
# too slow
orm_contents.add_contents_update(new_update_contents, session=session)
chunks = get_list_chunks(new_update_contents)
for chunk in chunks:
orm_contents.add_contents_update(chunk, session=session)
# orm_contents.delete_contents_update(session=session)
pass
if new_contents:
orm_contents.add_contents(new_contents, session=session)
chunks = get_list_chunks(new_contents)
for chunk in chunks:
orm_contents.add_contents(chunk, session=session)
if new_contents_ext:
orm_contents.add_contents_ext(new_contents_ext, session=session)
chunks = get_list_chunks(new_contents_ext)
for chunk in chunks:
orm_contents.add_contents_ext(chunk, session=session)
if update_contents_ext:
orm_contents.update_contents_ext(update_contents_ext, session=session)
chunks = get_list_chunks(update_contents_ext)
for chunk in chunks:
orm_contents.update_contents_ext(chunk, session=session)
if new_input_dependency_contents:
new_input_dependency_contents = resolve_input_dependency_id(new_input_dependency_contents, session=session)
orm_contents.add_contents(new_input_dependency_contents, session=session)
chunks = get_list_chunks(new_input_dependency_contents)
for chunk in chunks:
orm_contents.add_contents(chunk, session=session)
if update_dep_contents:
request_id, update_dep_contents_status_name, update_dep_contents_status = update_dep_contents
for status_name in update_dep_contents_status_name:
status = update_dep_contents_status_name[status_name]
status_content_ids = update_dep_contents_status[status_name]
if status_content_ids:
orm_contents.update_dep_contents(request_id, status_content_ids, status, session=session)
chunks = get_list_chunks(status_content_ids)
for chunk in chunks:
orm_contents.update_dep_contents(request_id, chunk, status, session=session)
if update_processing:
orm_processings.update_processing(processing_id=update_processing['processing_id'],
parameters=update_processing['parameters'],
session=session)
if update_messages:
orm_messages.update_messages(update_messages, bulk_size=message_bulk_size, session=session)
chunks = get_list_chunks(update_messages)
for chunk in chunks:
orm_messages.update_messages(chunk, bulk_size=message_bulk_size, session=session)
if messages:
if not type(messages) in [list, tuple]:
messages = [messages]
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/tests/test_domapanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
# task_queue = 'SLAC_Rubin_Merge'
# task_queue = 'SLAC_TEST'

task_cloud = None
# task_cloud = None


def randStr(chars=string.ascii_lowercase + string.digits, N=10):
Expand Down
4 changes: 2 additions & 2 deletions main/lib/idds/tests/test_domapanda_big.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def setup_workflow():
taskN4.dependencies = [
{"name": "00004" + str(k),
"dependencies": [],
"submitted": False} for k in range(1000)
"submitted": False} for k in range(10000)
]

taskN5 = PanDATask()
Expand All @@ -182,7 +182,7 @@ def setup_workflow():
taskN5.dependencies = [
{"name": "00005" + str(k),
"dependencies": [],
"submitted": False} for k in range(1000)
"submitted": False} for k in range(10000)
]

work1 = DomaPanDAWork(executable='echo; sleep 180',
Expand Down

0 comments on commit e0ad9b5

Please sign in to comment.