From 9abbf4e2a8010e2009643a9f1dd1e41efabaf63a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 14 Mar 2024 14:34:33 +0100 Subject: [PATCH] add is_processed to workflow --- workflow/lib/idds/workflow/workflow.py | 11 +++++++++++ workflow/lib/idds/workflowv2/workflow.py | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/workflow/lib/idds/workflow/workflow.py b/workflow/lib/idds/workflow/workflow.py index 249c455f..8139f379 100644 --- a/workflow/lib/idds/workflow/workflow.py +++ b/workflow/lib/idds/workflow/workflow.py @@ -1825,6 +1825,12 @@ def is_subfinished(self, synchronize=True): """ return self.is_terminated() and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works) + def is_processed(self, synchronize=True): + """ + *** Function called by Transformer agent. + """ + return self.is_terminated(synchronize=synchronize) and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works) + def is_failed(self, synchronize=True): """ *** Function called by Marshaller agent. @@ -2218,6 +2224,11 @@ def is_subfinished(self, synchronize=True): return self.runs[str(self.num_run)].is_subfinished() return False + def is_processed(self, synchronize=True): + if self.is_terminated(): + return self.runs[str(self.num_run)].is_processed() + return False + def is_failed(self, synchronize=True): if self.is_terminated(): return self.runs[str(self.num_run)].is_failed() diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index fe6b49f3..06c949e1 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -1972,6 +1972,12 @@ def is_subfinished(self, synchronize=True): """ return self.is_terminated(synchronize=synchronize) and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works) + def is_processed(self, synchronize=True): + """ + *** Function called by Transformer agent. + """ + return self.is_terminated(synchronize=synchronize) and (self.num_finished_works + self.num_subfinished_works > 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works) + def is_failed(self, synchronize=True): """ *** Function called by Marshaller agent. @@ -2500,6 +2506,20 @@ def is_subfinished(self, synchronize=True): return self.runs[str(self.num_run)].is_subfinished(synchronize=False) return False + def is_processed(self, synchronize=True): + if self.is_terminated(synchronize=synchronize): + build_work = self.get_build_work() + if build_work: + if build_work.is_terminated(): + if not build_work.is_processed(): + return False + else: + pass + else: + return False + return self.runs[str(self.num_run)].is_processed(synchronize=False) + return False + def is_failed(self, synchronize=True): if self.is_terminated(synchronize=synchronize): build_work = self.get_build_work()