Skip to content

Commit

Permalink
send input collection closed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
wguanicedew committed Feb 10, 2023
1 parent 9baad90 commit 6d6b859
Showing 1 changed file with 43 additions and 6 deletions.
49 changes: 43 additions & 6 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,25 @@ def generate_messages(request_id, transform_id, workload_id, work, msg_type='fil
'num_contents': num_msg_content,
'msg_content': msg_content}
return [msg]
elif msg_type == 'collection':
msg_type_contents = []
for coll in files:
msg_type_content = generate_collection_messages(request_id, transform_id, workload_id, work, coll, relation_type=relation_type)
msg_type_contents.append(msg_type_content)

msgs = []
for i_msg_type, msg_content, num_msg_content in msg_type_contents:
msg = {'msg_type': i_msg_type,
'status': MessageStatus.New,
'source': MessageSource.Carrier,
'destination': MessageDestination.Outside,
'request_id': request_id,
'workload_id': workload_id,
'transform_id': transform_id,
'num_contents': num_msg_content,
'msg_content': msg_content}
msgs.append(msg)
return msgs
elif msg_type == 'work':
# link collections
input_collections = work.get_input_collections()
Expand Down Expand Up @@ -1204,6 +1223,7 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou

all_updates_flushed = True
coll_status = {}
messages = []
for map_id in input_output_maps:
inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else []
# inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else []
Expand Down Expand Up @@ -1296,6 +1316,20 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
'processed_ext_files': coll.processed_ext_files,
'failed_ext_files': coll.failed_ext_files,
'missing_ext_files': coll.missing_ext_files}

if coll in input_collections:
if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
coll_db = core_catalog.get_collection(coll_id=coll.coll_id)
coll.status = coll_db['status']
if coll.status is not None and coll.status != CollectionStatus.Closed:
u_coll['status'] = CollectionStatus.Closed
u_coll['substatus'] = CollectionStatus.Closed
coll.status = CollectionStatus.Closed
coll.substatus = CollectionStatus.Closed

msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='collection', files=[coll], relation_type='input')
messages += msgs

if terminate:
if coll in output_collections and work.require_ext_contents():
if coll.processed_files == coll.processed_ext_files and coll.failed_files == coll.failed_ext_files:
Expand All @@ -1313,7 +1347,7 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
coll.substatus = CollectionStatus.Closed

update_collections.append(u_coll)
return update_collections, all_updates_flushed
return update_collections, all_updates_flushed, messages


def sync_work_status(request_id, transform_id, workload_id, work):
Expand Down Expand Up @@ -1353,16 +1387,19 @@ def sync_processing(processing, agent_attributes, terminate=False, logger=None,
work = proc.work
work.set_agent_attributes(agent_attributes, processing)

messages = []
input_output_maps = get_input_output_maps(transform_id, work)
update_collections, all_updates_flushed = sync_collection_status(request_id, transform_id, workload_id, work,
input_output_maps=input_output_maps,
close_collection=True, terminate=terminate)
update_collections, all_updates_flushed, msgs = sync_collection_status(request_id, transform_id, workload_id, work,
input_output_maps=input_output_maps,
close_collection=True, terminate=terminate)

messages += msgs

messages = []
sync_work_status(request_id, transform_id, workload_id, work)
logger.info(log_prefix + "sync_processing: work status: %s" % work.get_status())
if terminate and work.is_terminated():
messages = generate_messages(request_id, transform_id, workload_id, work, msg_type='work')
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='work')
messages += msgs
if work.is_finished():
# processing['status'] = ProcessingStatus.Finished
processing['status'] = processing['substatus']
Expand Down

0 comments on commit 6d6b859

Please sign in to comment.