Skip to content

Commit

Permalink
fix workflow bug that to_start_works not released
Browse files Browse the repository at this point in the history
  • Loading branch information
wguanicedew committed Oct 27, 2022
1 parent 54fb35e commit e694022
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
3 changes: 3 additions & 0 deletions main/lib/idds/tests/test_domapanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def setup_workflow():
log_collections=[], dependency_map=taskN1.dependencies,
task_name=taskN1.name, task_queue=task_queue,
encode_command_line=True,
prodSourceLabel='managed',
task_log={"dataset": "PandaJob_#{pandaid}/",
"destination": "local",
"param_type": "log",
Expand All @@ -141,6 +142,7 @@ def setup_workflow():
log_collections=[], dependency_map=taskN2.dependencies,
task_name=taskN2.name, task_queue=task_queue,
encode_command_line=True,
prodSourceLabel='managed',
task_log={"dataset": "PandaJob_#{pandaid}/",
"destination": "local",
"param_type": "log",
Expand All @@ -154,6 +156,7 @@ def setup_workflow():
log_collections=[], dependency_map=taskN3.dependencies,
task_name=taskN3.name, task_queue=task_queue,
encode_command_line=True,
prodSourceLabel='managed',
task_log={"dataset": "PandaJob_#{pandaid}/",
"destination": "local",
"param_type": "log",
Expand Down
13 changes: 9 additions & 4 deletions workflow/lib/idds/workflowv2/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1443,19 +1443,24 @@ def get_new_works(self, synchronize=True):
return works

if self.to_start_works:
self.logger.info("%s to_start_works: %s" % (self.get_internal_id(), str(self.to_start_works)))
to_start_works = self.to_start_works.copy()
init_works = self.init_works
starting_works = []
for work_id in to_start_works:
if not self.works[work_id].has_dependency():
starting_works.append(work_id)
self.get_new_work_to_run(work_id)
if not init_works:
init_works.append(work_id)
self.init_works = init_works
if not starting_works:
work_id = to_start_works.pop(0)
starting_works.append(work_id)
for work_id in starting_works:
self.get_new_work_to_run(work_id)
if not init_works:
init_works.append(work_id)
self.init_works = init_works
if work_id in self.to_start_works:
self.to_start_works.remove(work_id)
self.logger.info("%s starting_works: %s" % (self.get_internal_id(), str(starting_works)))

for k in self.new_to_run_works:
if isinstance(self.works[k], Work):
Expand Down

0 comments on commit e694022

Please sign in to comment.