From 42e7a108df5381471a0b77ce1c499fea9247a86c Mon Sep 17 00:00:00 2001 From: rhoadesScholar Date: Mon, 11 Mar 2024 10:42:44 -0400 Subject: [PATCH] =?UTF-8?q?fix:=20=E2=9A=A1=EF=B8=8F=20Simplified=20tmpdir?= =?UTF-8?q?=20management=20for=20cluster.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dacapo/blockwise/scheduler.py | 133 ++++++++++++++++------------- dacapo/blockwise/segment_worker.py | 14 +-- dacapo/cli.py | 3 - dacapo/options.py | 2 +- 4 files changed, 84 insertions(+), 68 deletions(-) diff --git a/dacapo/blockwise/scheduler.py b/dacapo/blockwise/scheduler.py index 39b9ef6b8..4b4ff44e8 100644 --- a/dacapo/blockwise/scheduler.py +++ b/dacapo/blockwise/scheduler.py @@ -1,4 +1,5 @@ from pathlib import Path +import shutil import tempfile import time import daisy @@ -7,6 +8,7 @@ import yaml from dacapo.blockwise import DaCapoBlockwiseTask +from dacapo import Options import logging logger = logging.getLogger(__name__) @@ -96,7 +98,6 @@ def segment_blockwise( max_retries: int = 2, timeout=None, upstream_tasks=None, - tmp_prefix="tmp", *args, **kwargs, ): @@ -133,6 +134,14 @@ def segment_blockwise( (either due to failed post check or application crashes or network failure) + timeout (``int``): + + The maximum time in seconds to wait for a worker to complete a task. + + upstream_tasks (``List``): + + List of upstream tasks. + *args: Additional positional arguments to pass to ``worker_function``. @@ -145,61 +154,67 @@ def segment_blockwise( ``Bool``. """ - with tempfile.TemporaryDirectory(prefix=tmp_prefix) as tmpdir: - logger.info( - "Running blockwise segmentation, with segment_function_file: ", - segment_function_file, - " in temp directory: ", - tmpdir, - ) - # write parameters to tmpdir - if "parameters" in kwargs: - with open(Path(tmpdir, "parameters.yaml"), "w") as f: - yaml.dump(kwargs.pop("parameters"), f) - - # Make the task - task = DaCapoBlockwiseTask( - str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), - total_roi.grow(context, context), - read_roi, - write_roi, - num_workers, - max_retries, - timeout, - upstream_tasks, - tmpdir=tmpdir, - function_path=str(segment_function_file), - *args, - **kwargs, - ) - logger.info( - "Running blockwise segmentation with worker_file: ", - str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), - ) - success = daisy.run_blockwise([task]) - - # give a second for the fist task to finish - time.sleep(1) - read_roi = write_roi - - # Make the task - task = DaCapoBlockwiseTask( - str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), - total_roi, - read_roi, - write_roi, - num_workers, - max_retries, - timeout, - upstream_tasks, - tmpdir=tmpdir, - *args, - **kwargs, - ) - logger.info( - "Running blockwise relabeling with worker_file: ", - str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), - ) - - success = success and daisy.run_blockwise([task]) - return success + options = Options.instance() + if not options.runs_base_dir.exists(): + options.runs_base_dir.mkdir(parents=True) + tmpdir = tempfile.mkdtemp(dir=options.runs_base_dir) + + logger.info( + "Running blockwise segmentation, with segment_function_file: ", + segment_function_file, + " in temp directory: ", + tmpdir, + ) + # write parameters to tmpdir + if "parameters" in kwargs: + with open(Path(tmpdir, "parameters.yaml"), "w") as f: + yaml.dump(kwargs.pop("parameters"), f) + + # Make the task + task = DaCapoBlockwiseTask( + str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), + total_roi.grow(context, context), + read_roi, + write_roi, + num_workers, + max_retries, + timeout, + upstream_tasks, + tmpdir=tmpdir, + function_path=str(segment_function_file), + *args, + **kwargs, + ) + logger.info( + "Running blockwise segmentation with worker_file: ", + str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")), + ) + success = daisy.run_blockwise([task]) + + # give a second for the fist task to finish + time.sleep(1) + read_roi = write_roi + + # Make the task + task = DaCapoBlockwiseTask( + str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), + total_roi, + read_roi, + write_roi, + num_workers, + max_retries, + timeout, + upstream_tasks, + tmpdir=tmpdir, + *args, + **kwargs, + ) + logger.info( + "Running blockwise relabeling with worker_file: ", + str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")), + ) + + success = success and daisy.run_blockwise([task]) + + shutil.rmtree(tmpdir, ignore_errors=True) + return success diff --git a/dacapo/blockwise/segment_worker.py b/dacapo/blockwise/segment_worker.py index fb1b6423a..0d90a143d 100644 --- a/dacapo/blockwise/segment_worker.py +++ b/dacapo/blockwise/segment_worker.py @@ -170,11 +170,15 @@ def start_worker( nodes = np.unique(edges) logger.info(f"Writing ids to {os.path.join(tmpdir, 'block_%d.npz')}") - np.savez_compressed( - os.path.join(tmpdir, "block_%d.npz" % block.block_id[1]), - nodes=nodes, - edges=edges, - ) + assert os.path.exists(tmpdir) + with open( + os.path.join(tmpdir, f"block_{block.block_id[1]}.npz"), "wb" + ) as f: + np.savez_compressed( + f, + nodes=nodes, + edges=edges, + ) def spawn_worker( diff --git a/dacapo/cli.py b/dacapo/cli.py index 798f5ae75..29541a962 100644 --- a/dacapo/cli.py +++ b/dacapo/cli.py @@ -329,7 +329,6 @@ def run_blockwise( @click.option("-nw", "--num_workers", type=int, default=16) @click.option("-mr", "--max_retries", type=int, default=2) @click.option("-t", "--timeout", type=int, default=None) -@click.option("-tp", "--tmp_prefix", type=str, default="tmp") @click.option("-ow", "--overwrite", is_flag=True, default=True) @click.option("-co", "--channels_out", type=int, default=None) @click.pass_context @@ -347,7 +346,6 @@ def segment_blockwise( num_workers: int = 16, max_retries: int = 2, timeout=None, - tmp_prefix: str = "tmp", overwrite: bool = True, channels_out: Optional[int] = None, *args, @@ -401,7 +399,6 @@ def segment_blockwise( num_workers=num_workers, max_retries=max_retries, timeout=timeout, - tmp_prefix=tmp_prefix, parameters=parameters, *args, **kwargs, diff --git a/dacapo/options.py b/dacapo/options.py index c54892f52..9d6055fc9 100644 --- a/dacapo/options.py +++ b/dacapo/options.py @@ -24,7 +24,7 @@ class DaCapoConfig: runs_base_dir: Path = attr.ib( default=Path(expanduser("~/.dacapo")), metadata={ - "help_text": "The path at DaCapo will use for reading and writing any necessary data." + "help_text": "The path at DaCapo will use for reading and writing any necessary data. This should be an absolute path." }, ) compute_context: dict = attr.ib(