Skip to content

Commit

Permalink
fix: ⚡️ Simplified tmpdir management for cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
rhoadesScholar committed Mar 11, 2024
1 parent eb09556 commit 42e7a10
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 68 deletions.
133 changes: 74 additions & 59 deletions dacapo/blockwise/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
import shutil
import tempfile
import time
import daisy
Expand All @@ -7,6 +8,7 @@
import yaml

from dacapo.blockwise import DaCapoBlockwiseTask
from dacapo import Options
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -96,7 +98,6 @@ def segment_blockwise(
max_retries: int = 2,
timeout=None,
upstream_tasks=None,
tmp_prefix="tmp",
*args,
**kwargs,
):
Expand Down Expand Up @@ -133,6 +134,14 @@ def segment_blockwise(
(either due to failed post check or application crashes or network
failure)
timeout (``int``):
The maximum time in seconds to wait for a worker to complete a task.
upstream_tasks (``List``):
List of upstream tasks.
*args:
Additional positional arguments to pass to ``worker_function``.
Expand All @@ -145,61 +154,67 @@ def segment_blockwise(
``Bool``.
"""
with tempfile.TemporaryDirectory(prefix=tmp_prefix) as tmpdir:
logger.info(
"Running blockwise segmentation, with segment_function_file: ",
segment_function_file,
" in temp directory: ",
tmpdir,
)
# write parameters to tmpdir
if "parameters" in kwargs:
with open(Path(tmpdir, "parameters.yaml"), "w") as f:
yaml.dump(kwargs.pop("parameters"), f)

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
total_roi.grow(context, context),
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
function_path=str(segment_function_file),
*args,
**kwargs,
)
logger.info(
"Running blockwise segmentation with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
)
success = daisy.run_blockwise([task])

# give a second for the fist task to finish
time.sleep(1)
read_roi = write_roi

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
total_roi,
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
*args,
**kwargs,
)
logger.info(
"Running blockwise relabeling with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
)

success = success and daisy.run_blockwise([task])
return success
options = Options.instance()
if not options.runs_base_dir.exists():
options.runs_base_dir.mkdir(parents=True)
tmpdir = tempfile.mkdtemp(dir=options.runs_base_dir)

logger.info(
"Running blockwise segmentation, with segment_function_file: ",
segment_function_file,
" in temp directory: ",
tmpdir,
)
# write parameters to tmpdir
if "parameters" in kwargs:
with open(Path(tmpdir, "parameters.yaml"), "w") as f:
yaml.dump(kwargs.pop("parameters"), f)

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
total_roi.grow(context, context),
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
function_path=str(segment_function_file),
*args,
**kwargs,
)
logger.info(
"Running blockwise segmentation with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
)
success = daisy.run_blockwise([task])

# give a second for the fist task to finish
time.sleep(1)
read_roi = write_roi

# Make the task
task = DaCapoBlockwiseTask(
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
total_roi,
read_roi,
write_roi,
num_workers,
max_retries,
timeout,
upstream_tasks,
tmpdir=tmpdir,
*args,
**kwargs,
)
logger.info(
"Running blockwise relabeling with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
)

success = success and daisy.run_blockwise([task])

shutil.rmtree(tmpdir, ignore_errors=True)
return success
14 changes: 9 additions & 5 deletions dacapo/blockwise/segment_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,15 @@ def start_worker(
nodes = np.unique(edges)

logger.info(f"Writing ids to {os.path.join(tmpdir, 'block_%d.npz')}")
np.savez_compressed(
os.path.join(tmpdir, "block_%d.npz" % block.block_id[1]),
nodes=nodes,
edges=edges,
)
assert os.path.exists(tmpdir)
with open(
os.path.join(tmpdir, f"block_{block.block_id[1]}.npz"), "wb"
) as f:
np.savez_compressed(
f,
nodes=nodes,
edges=edges,
)


def spawn_worker(
Expand Down
3 changes: 0 additions & 3 deletions dacapo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ def run_blockwise(
@click.option("-nw", "--num_workers", type=int, default=16)
@click.option("-mr", "--max_retries", type=int, default=2)
@click.option("-t", "--timeout", type=int, default=None)
@click.option("-tp", "--tmp_prefix", type=str, default="tmp")
@click.option("-ow", "--overwrite", is_flag=True, default=True)
@click.option("-co", "--channels_out", type=int, default=None)
@click.pass_context
Expand All @@ -347,7 +346,6 @@ def segment_blockwise(
num_workers: int = 16,
max_retries: int = 2,
timeout=None,
tmp_prefix: str = "tmp",
overwrite: bool = True,
channels_out: Optional[int] = None,
*args,
Expand Down Expand Up @@ -401,7 +399,6 @@ def segment_blockwise(
num_workers=num_workers,
max_retries=max_retries,
timeout=timeout,
tmp_prefix=tmp_prefix,
parameters=parameters,
*args,
**kwargs,
Expand Down
2 changes: 1 addition & 1 deletion dacapo/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DaCapoConfig:
runs_base_dir: Path = attr.ib(
default=Path(expanduser("~/.dacapo")),
metadata={
"help_text": "The path at DaCapo will use for reading and writing any necessary data."
"help_text": "The path at DaCapo will use for reading and writing any necessary data. This should be an absolute path."
},
)
compute_context: dict = attr.ib(
Expand Down

0 comments on commit 42e7a10

Please sign in to comment.