From 1cf6f509d46139f552adf09aa4b1bf5933809f55 Mon Sep 17 00:00:00 2001 From: Philipp Otto Date: Wed, 6 Apr 2022 09:21:13 +0200 Subject: [PATCH] Also store success boolean to pickle output when using multiprocessing (#686) * also store success boolean to pickle output when using multiprocessing instead of slurm * update changelog * merge print and logging instructions; convert print to logging in other places, too * add logging import * fix return value propagation in multiprocessing context with serialization of result * format --- cluster_tools/Changelog.md | 1 + cluster_tools/cluster_tools/__init__.py | 12 ++++++++++-- cluster_tools/cluster_tools/remote.py | 6 ++---- .../cluster_tools/schedulers/cluster_executor.py | 8 ++++---- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/cluster_tools/Changelog.md b/cluster_tools/Changelog.md index d3372ae16..53f09b671 100644 --- a/cluster_tools/Changelog.md +++ b/cluster_tools/Changelog.md @@ -10,6 +10,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section [Commits](https://github.com/scalableminds/webknossos-libs/compare/v0.9.17...HEAD) ### Breaking Changes +- The cluster-tools serialize the output of a job in the format `(wasSuccessful, result_value)` to a pickle file if `output_pickle_path` is provided and multiprocessing is used. This is consistent with how it is already done when using a cluster executor (e.g., slurm). [#686](https://github.com/scalableminds/webknossos-libs/pull/686) ### Added diff --git a/cluster_tools/cluster_tools/__init__.py b/cluster_tools/cluster_tools/__init__.py index a55b57d9d..f61fa3fce 100644 --- a/cluster_tools/cluster_tools/__init__.py +++ b/cluster_tools/cluster_tools/__init__.py @@ -1,3 +1,4 @@ +import logging import multiprocessing import os import tempfile @@ -161,12 +162,19 @@ def _execute_and_persist_function(output_pickle_path, *args, **kwargs): func = args[0] args = args[1:] - result = func(*args, **kwargs) + try: + result = True, func(*args, **kwargs) + except Exception as exc: + result = False, exc + logging.warning(f"Job computation failed with:\n{exc.__repr__()}") with open(output_pickle_path, "wb") as file: pickling.dump(result, file) - return result + if result[0]: + return result[1] + else: + raise result[1] def map_unordered(self, func, args): diff --git a/cluster_tools/cluster_tools/remote.py b/cluster_tools/cluster_tools/remote.py index 6ce6188c1..ac2912196 100644 --- a/cluster_tools/cluster_tools/remote.py +++ b/cluster_tools/cluster_tools/remote.py @@ -47,8 +47,7 @@ def worker(executor, workerid, job_array_index, job_array_index_offset, cfut_dir try: input_file_name = executor.format_infile_name(cfut_dir, workerid_with_idx) - print("trying to read: ", input_file_name) - print("working dir: ", os.getcwd()) + logging.debug(f"Trying to read: {input_file_name} (working dir: {os.getcwd()}") custom_main_path = get_custom_main_path(workerid, executor) with open(input_file_name, "rb") as f: @@ -78,10 +77,9 @@ def worker(executor, workerid, job_array_index, job_array_index_offset, cfut_dir out = pickling.dumps(result) except Exception: - print(traceback.format_exc()) result = False, format_remote_exc() - logging.warning("Job computation failed.") + logging.warning(f"Job computation failed with:\n\n{traceback.format_exc()}") out = pickling.dumps(result) # The .preliminary postfix is added since the output can diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index 3410c1b9d..077b300b6 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -103,7 +103,7 @@ def executor_key(cls): def handle_kill(self, _signum, _frame): self.wait_thread.stop() job_ids = ",".join(str(id) for id in self.jobs.keys()) - print( + logging.debug( "A termination signal was registered. The following jobs are still running on the cluster:\n{}".format( job_ids ) @@ -204,7 +204,7 @@ def _completion(self, jobid, failed_early): if not self.jobs: self.jobs_empty_cond.notify_all() if self.debug: - print("job completed: {}".format(jobid), file=sys.stderr) + logging.debug("Job completed: {}".format(jobid), file=sys.stderr) preliminary_outfile_name = with_preliminary_postfix(outfile_name) if failed_early: @@ -298,7 +298,7 @@ def submit(self, fun, *args, **kwargs): jobid = jobids_futures[0].result() if self.debug: - print(f"job submitted: {jobid}", file=sys.stderr) + logging.debug(f"Job submitted: {jobid}", file=sys.stderr) # Thread will wait for it to finish. self.wait_thread.waitFor(preliminary_output_pickle_path, jobid) @@ -420,7 +420,7 @@ def register_jobs( jobid = jobid_future.result() if self.debug: - print( + logging.debug( "Submitted array job {} with JobId {} and {} subjobs.".format( batch_description, jobid, len(futs_with_output_paths) ),