diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 81727ef3..710240c9 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -1723,7 +1723,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext= self.logger.debug(log_prefix + "poll_panda_task, task_id: %s, all jobs: %s, unterminated_jobs: %s" % (str(task_id), len(all_jobs_ids), len(unterminated_jobs))) unterminated_jobs_status = self.poll_panda_jobs(unterminated_jobs, executors=executors, log_prefix=log_prefix) - self.logger.debug("unterminated_jobs_status: %s" % str(unterminated_jobs_status)) + self.logger.debug(log_prefix + "unterminated_jobs_status: %s" % str(unterminated_jobs_status)) abort_status = False if processing_status in [ProcessingStatus.Cancelled]: diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 308e2065..bc7ea883 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1093,7 +1093,7 @@ def trigger_release_inputs(request_id, transform_id, workload_id, work, updated_ return update_contents, update_input_contents_full, update_contents_status_name, update_contents_status -def poll_missing_outputs(input_output_maps, max_updates_per_round=2000): +def poll_missing_outputs(input_output_maps, contents_ext=[], max_updates_per_round=2000): content_updates_missing, updated_contents_full_missing = [], [] chunks = [] @@ -1200,6 +1200,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round logger.debug(log_prefix + "get_new_input_output_maps: len: %s" % len(new_input_output_maps)) logger.debug(log_prefix + "get_new_input_output_maps.keys[:3]: %s" % str(list(new_input_output_maps.keys())[:3])) + contents_ext = [] if work.require_ext_contents(): contents_ext = get_ext_contents(transform_id, work) job_info_maps = core_catalog.get_contents_ext_maps() @@ -1257,7 +1258,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round ret_futures.add(f) ret_msgs = [] - content_updates_missing_chunks = poll_missing_outputs(input_output_maps, max_updates_per_round=max_updates_per_round) + content_updates_missing_chunks = poll_missing_outputs(input_output_maps, contents_ext=contents_ext, max_updates_per_round=max_updates_per_round) for content_updates_missing_chunk in content_updates_missing_chunks: content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk msgs = [] @@ -2033,7 +2034,7 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou coll.substatus = CollectionStatus.Closed elif coll in output_collections: if (not work.require_ext_contents() or (work.require_ext_contents() - and coll.processed_files == coll.processed_ext_files and coll.failed_files == coll.failed_ext_files)): # noqa E129, W503 + and coll.processed_files == coll.processed_ext_files and coll.failed_files <= coll.failed_ext_files)): # noqa E129, W503 all_ext_updated = True if (force_close_collection or (close_collection and all_updates_flushed and all_ext_updated and all_files_monitored) or coll.status == CollectionStatus.Closed): # noqa W503