Skip to content

Commit

Permalink
Merge pull request #322 from wguanicedew/dev
Browse files Browse the repository at this point in the history
fix missing event counting for es
  • Loading branch information
wguanicedew authored Jun 13, 2024
2 parents 0175868 + 30855ae commit 2f4b45f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
2 changes: 1 addition & 1 deletion doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=
self.logger.debug(log_prefix + "poll_panda_task, task_id: %s, all jobs: %s, unterminated_jobs: %s" % (str(task_id), len(all_jobs_ids), len(unterminated_jobs)))

unterminated_jobs_status = self.poll_panda_jobs(unterminated_jobs, executors=executors, log_prefix=log_prefix)
self.logger.debug("unterminated_jobs_status: %s" % str(unterminated_jobs_status))
self.logger.debug(log_prefix + "unterminated_jobs_status: %s" % str(unterminated_jobs_status))

abort_status = False
if processing_status in [ProcessingStatus.Cancelled]:
Expand Down
7 changes: 4 additions & 3 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ def trigger_release_inputs(request_id, transform_id, workload_id, work, updated_
return update_contents, update_input_contents_full, update_contents_status_name, update_contents_status


def poll_missing_outputs(input_output_maps, max_updates_per_round=2000):
def poll_missing_outputs(input_output_maps, contents_ext=[], max_updates_per_round=2000):
content_updates_missing, updated_contents_full_missing = [], []

chunks = []
Expand Down Expand Up @@ -1200,6 +1200,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round
logger.debug(log_prefix + "get_new_input_output_maps: len: %s" % len(new_input_output_maps))
logger.debug(log_prefix + "get_new_input_output_maps.keys[:3]: %s" % str(list(new_input_output_maps.keys())[:3]))

contents_ext = []
if work.require_ext_contents():
contents_ext = get_ext_contents(transform_id, work)
job_info_maps = core_catalog.get_contents_ext_maps()
Expand Down Expand Up @@ -1257,7 +1258,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round
ret_futures.add(f)

ret_msgs = []
content_updates_missing_chunks = poll_missing_outputs(input_output_maps, max_updates_per_round=max_updates_per_round)
content_updates_missing_chunks = poll_missing_outputs(input_output_maps, contents_ext=contents_ext, max_updates_per_round=max_updates_per_round)
for content_updates_missing_chunk in content_updates_missing_chunks:
content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk
msgs = []
Expand Down Expand Up @@ -2033,7 +2034,7 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
coll.substatus = CollectionStatus.Closed
elif coll in output_collections:
if (not work.require_ext_contents() or (work.require_ext_contents()
and coll.processed_files == coll.processed_ext_files and coll.failed_files == coll.failed_ext_files)): # noqa E129, W503
and coll.processed_files == coll.processed_ext_files and coll.failed_files <= coll.failed_ext_files)): # noqa E129, W503
all_ext_updated = True
if (force_close_collection or (close_collection and all_updates_flushed and all_ext_updated and all_files_monitored)
or coll.status == CollectionStatus.Closed): # noqa W503
Expand Down

0 comments on commit 2f4b45f

Please sign in to comment.