From db7d8aa0950939f90c666d55304edf62f958f42b Mon Sep 17 00:00:00 2001 From: rhoadesScholar Date: Mon, 11 Mar 2024 15:53:12 -0400 Subject: [PATCH] =?UTF-8?q?fix:=20=E2=9A=A1=EF=B8=8F=20Fix=20logging,=20bl?= =?UTF-8?q?ock=20alignment,=20tmpdirs.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dacapo/blockwise/predict_worker.py | 6 ------ dacapo/blockwise/relabel_worker.py | 10 +++++++++- dacapo/blockwise/segment_worker.py | 7 +++---- dacapo/cli.py | 2 ++ dacapo/compute_context/bsub.py | 15 +++++++++++---- 5 files changed, 25 insertions(+), 15 deletions(-) diff --git a/dacapo/blockwise/predict_worker.py b/dacapo/blockwise/predict_worker.py index 68a2eec1a..e1b49b0c9 100644 --- a/dacapo/blockwise/predict_worker.py +++ b/dacapo/blockwise/predict_worker.py @@ -165,12 +165,6 @@ def start_worker( output_size, voxel_size=output_voxel_size, ) - # # use daisy requests to run pipeline - # pipeline += gp.DaisyRequestBlocks( - # reference=request, - # roi_map={raw: "read_roi", prediction: "write_roi"}, - # num_workers=1, - # ) daisy_client = daisy.Client() diff --git a/dacapo/blockwise/relabel_worker.py b/dacapo/blockwise/relabel_worker.py index 654be6bd8..b374f7120 100644 --- a/dacapo/blockwise/relabel_worker.py +++ b/dacapo/blockwise/relabel_worker.py @@ -1,6 +1,7 @@ from glob import glob import os import sys +from time import sleep import daisy from dacapo.compute_context import create_compute_context from dacapo.store.array_store import LocalArrayIdentifier @@ -54,7 +55,14 @@ def start_worker( if block is None: break - relabel_in_block(array_out, nodes, components, block) + try: + relabel_in_block(array_out, nodes, components, block) + except OSError as e: + logging.error( + f"Failed to relabel block {block.write_roi}: {e}. Trying again." + ) + sleep(1) + relabel_in_block(array_out, nodes, components, block) def relabel_in_block(array_out, old_values, new_values, block): diff --git a/dacapo/blockwise/segment_worker.py b/dacapo/blockwise/segment_worker.py index 1d01ec0c8..da1e0c098 100644 --- a/dacapo/blockwise/segment_worker.py +++ b/dacapo/blockwise/segment_worker.py @@ -167,11 +167,10 @@ def start_worker( edges = unique_pairs[non_zero_filter] nodes = np.unique(edges) - print(f"Writing ids to {os.path.join(tmpdir, 'block_%d.npz')}") assert os.path.exists(tmpdir) - with open( - os.path.join(tmpdir, f"block_{block.block_id[1]}.npz"), "wb" - ) as f: + path = os.path.join(tmpdir, f"block_{block.block_id[1]}.npz") + print(f"Writing ids to {path}") + with open(path, "wb") as f: np.savez_compressed( f, nodes=nodes, diff --git a/dacapo/cli.py b/dacapo/cli.py index 22cd453ac..4136c460c 100644 --- a/dacapo/cli.py +++ b/dacapo/cli.py @@ -262,6 +262,7 @@ def run_blockwise( input_array.voxel_size, output_dtype, overwrite=overwrite, + write_size=write_roi.shape, ) _run_blockwise( # type: ignore @@ -383,6 +384,7 @@ def segment_blockwise( input_array.voxel_size, np.uint64, overwrite=overwrite, + write_size=write_roi.shape, ) print( f"Created output array {output_array_identifier.container}:{output_array_identifier.dataset} with ROI {_total_roi}." diff --git a/dacapo/compute_context/bsub.py b/dacapo/compute_context/bsub.py index f89b08056..2c10da73b 100644 --- a/dacapo/compute_context/bsub.py +++ b/dacapo/compute_context/bsub.py @@ -1,4 +1,7 @@ +import os +from pathlib import Path from .compute_context import ComputeContext +import daisy import attr @@ -36,6 +39,10 @@ def device(self): return "cpu" def _wrap_command(self, command): + client = daisy.Client() + basename = str( + Path("./daisy_logs", client.task_id, f"worker_{client.worker_id}") + ) return ( [ "bsub", @@ -47,10 +54,10 @@ def _wrap_command(self, command): f"num={self.num_gpus}", "-J", "dacapo", - # "-o", - # f"{run_name}_train.out", - # "-e", - # f"{run_name}_train.err", + "-o", + f"{basename}.out", + "-e", + f"{basename}.err", ] + ( [