Skip to content

Commit

Permalink
Dev/main (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhoadesScholar authored Mar 11, 2024
2 parents 2b65fdd + eb09556 commit 990bc16
Show file tree
Hide file tree
Showing 39 changed files with 654 additions and 225 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ubuntu-latest, windows-latest, macos-latest]
# platform: [ubuntu-latest, windows-latest, macos-latest]
platform: [ubuntu-latest]
python-version: ['3.10', '3.11']

steps:
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@ scratch/
# vscode stuff
.vscode
.mypy_cache
.pytest_cache
daisy_logs/
tmp/

*.zarr/
*.n5/
6 changes: 4 additions & 2 deletions dacapo/blockwise/argmax_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
import sys
from dacapo.experiments.datasplits.datasets.arrays.zarr_array import ZarrArray
from dacapo.store.array_store import LocalArrayIdentifier
from dacapo.compute_context import create_compute_context
Expand Down Expand Up @@ -60,7 +61,7 @@ def start_worker(
client = daisy.Client()

while True:
print("getting block")
logger.info("getting block")
with client.acquire_block() as block:
if block is None:
break
Expand All @@ -87,7 +88,8 @@ def spawn_worker(

# Make the command for the worker to run
command = [
"python",
# "python",
sys.executable,
path,
"start-worker",
"--input_container",
Expand Down
30 changes: 17 additions & 13 deletions dacapo/blockwise/blockwise_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ def __init__(
read_write_conflict = worker.read_write_conflict
fit = worker.fit

kwargs = {
"task_id": task_id,
"total_roi": total_roi,
"read_roi": read_roi,
"write_roi": write_roi,
"process_function": process_function,
"check_function": check_function,
"init_callback_fn": init_callback_fn,
"read_write_conflict": read_write_conflict,
"num_workers": num_workers,
"max_retries": max_retries,
"fit": fit,
"timeout": timeout,
"upstream_tasks": upstream_tasks,
}

super().__init__(
task_id,
total_roi,
read_roi,
write_roi,
process_function,
check_function,
init_callback_fn,
read_write_conflict,
num_workers,
max_retries,
fit,
timeout,
upstream_tasks,
**{k: v for k, v in kwargs.items() if v is not None},
)
21 changes: 12 additions & 9 deletions dacapo/blockwise/empanada_function.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import numpy as np
import logging

import os
logger = logging.getLogger(__name__)

try:
from empanada_napari.inference import Engine3d
Expand Down Expand Up @@ -73,7 +74,7 @@ def orthoplane_inference(engine, volume):
# report instances per class
for tracker in trackers:
class_id = tracker.class_id
print(
logger.info(
f"Class {class_id}, axis {axis_name}, has {len(tracker.instances.keys())} instances"
)

Expand Down Expand Up @@ -152,7 +153,7 @@ def start_postprocess_worker(*args):
min_extent=min_extent,
dtype=engine.dtype,
):
print(f"Yielding {class_name} volume of shape {vol.shape}")
logger.info(f"Yielding {class_name} volume of shape {vol.shape}")
yield vol, class_name, tracker

def start_consensus_worker(trackers_dict):
Expand All @@ -165,7 +166,7 @@ def start_consensus_worker(trackers_dict):
min_extent=min_extent,
dtype=engine.dtype,
):
print(f"Yielding {class_name} volume of shape {vol.shape}")
logger.info(f"Yielding {class_name} volume of shape {vol.shape}")
yield vol, class_name, tracker

# verify that the image doesn't have extraneous channel dimensions
Expand All @@ -181,7 +182,7 @@ def start_consensus_worker(trackers_dict):
else:
raise Exception(f"Image volume must be 3D, got image of shape {shape}")

print(
logger.info(
f"Got 4D image of shape {shape}, extracted single channel of size {image.shape}"
)

Expand Down Expand Up @@ -209,7 +210,7 @@ def stack_postprocessing(

# create the final instance segmentations
for class_id, class_name in class_names.items():
print(f"Creating stack segmentation for class {class_name}...")
logger.info(f"Creating stack segmentation for class {class_name}...")

class_tracker = get_axis_trackers_by_class(trackers, class_id)[0]
shape3d = class_tracker.shape3d
Expand All @@ -223,7 +224,7 @@ def stack_postprocessing(
filters.remove_small_objects(stack_tracker, min_size=min_size)
filters.remove_pancakes(stack_tracker, min_span=min_extent)

print(f"Total {class_name} objects {len(stack_tracker.instances.keys())}")
logger.info(f"Total {class_name} objects {len(stack_tracker.instances.keys())}")

# decode and fill the instances
stack_vol = np.zeros(shape3d, dtype=dtype)
Expand Down Expand Up @@ -253,7 +254,7 @@ def tracker_consensus(
# create the final instance segmentations
for class_id, class_name in class_names.items():
# get the relevant trackers for the class_label
print(f"Creating consensus segmentation for class {class_name}...")
logger.info(f"Creating consensus segmentation for class {class_name}...")

class_trackers = get_axis_trackers_by_class(trackers, class_id)
shape3d = class_trackers[0].shape3d
Expand All @@ -270,7 +271,9 @@ def tracker_consensus(
class_trackers, pixel_vote_thr
)

print(f"Total {class_name} objects {len(consensus_tracker.instances.keys())}")
logger.info(
f"Total {class_name} objects {len(consensus_tracker.instances.keys())}"
)

# decode and fill the instances
consensus_vol = np.zeros(shape3d, dtype=dtype)
Expand Down
34 changes: 25 additions & 9 deletions dacapo/blockwise/predict_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
from pathlib import Path

import torch
Expand All @@ -11,6 +12,7 @@
import gunpowder.torch as gp_torch

from funlib.geometry import Coordinate
import daisy

import numpy as np
import click
Expand Down Expand Up @@ -165,15 +167,28 @@ def start_worker(
output_size,
voxel_size=output_voxel_size,
)
# use daisy requests to run pipeline
pipeline += gp.DaisyRequestBlocks(
reference=request,
roi_map={raw: "read_roi", prediction: "write_roi"},
num_workers=1,
)
# # use daisy requests to run pipeline
# pipeline += gp.DaisyRequestBlocks(
# reference=request,
# roi_map={raw: "read_roi", prediction: "write_roi"},
# num_workers=1,
# )

daisy_client = daisy.Client()

while True:
with daisy_client.acquire_block() as block:
if block is None:
return

logger.info("Processing block %s", block)

chunk_request = request.copy()
chunk_request[raw].roi = block.read_roi
chunk_request[prediction].roi = block.write_roi

with gp.build(pipeline):
batch = pipeline.request_batch(gp.BatchRequest())
with gp.build(pipeline):
_ = pipeline.request_batch(chunk_request)


def spawn_worker(
Expand All @@ -194,7 +209,8 @@ def spawn_worker(

# Make the command for the worker to run
command = [
"python",
# "python",
sys.executable,
path,
"start-worker",
"--run-name",
Expand Down
4 changes: 3 additions & 1 deletion dacapo/blockwise/relabel_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from glob import glob
import os
import sys
import daisy
from dacapo.compute_context import create_compute_context
from dacapo.store.array_store import LocalArrayIdentifier
Expand Down Expand Up @@ -101,7 +102,8 @@ def spawn_worker(

# Make the command for the worker to run
command = [
"python",
# "python",
sys.executable,
path,
"start-worker",
"--output_container",
Expand Down
70 changes: 67 additions & 3 deletions dacapo/blockwise/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import yaml

from dacapo.blockwise import DaCapoBlockwiseTask
import logging

logger = logging.getLogger(__name__)


def run_blockwise(
Expand Down Expand Up @@ -78,7 +81,9 @@ def run_blockwise(
**kwargs,
)

return daisy.run_blockwise([task])
logger.info("Running blockwise with worker_file: ", worker_file)
success = daisy.run_blockwise([task])
return success


def segment_blockwise(
Expand All @@ -95,7 +100,58 @@ def segment_blockwise(
*args,
**kwargs,
):
"""Run a segmentation function in parallel over a large volume.
Args:
segment_function_file (``str`` or ``Path``):
The path to the file containing the necessary worker functions:
``spawn_worker`` and ``start_worker``.
Optionally, the file can also contain a ``check_function`` and an ``init_callback_fn``.
context (``Coordinate``):
The context to add to the read and write ROI.
total_roi (``Roi``):
The ROI to process.
read_roi (``Roi``):
The ROI to read from for a block.
write_roi (``Roi``):
The ROI to write to for a block.
num_workers (``int``):
The number of workers to use.
max_retries (``int``):
The maximum number of times a task will be retried if failed
(either due to failed post check or application crashes or network
failure)
*args:
Additional positional arguments to pass to ``worker_function``.
**kwargs:
Additional keyword arguments to pass to ``worker_function``.
Returns:
``Bool``.
"""
with tempfile.TemporaryDirectory(prefix=tmp_prefix) as tmpdir:
logger.info(
"Running blockwise segmentation, with segment_function_file: ",
segment_function_file,
" in temp directory: ",
tmpdir,
)
# write parameters to tmpdir
if "parameters" in kwargs:
with open(Path(tmpdir, "parameters.yaml"), "w") as f:
Expand All @@ -116,7 +172,10 @@ def segment_blockwise(
*args,
**kwargs,
)

logger.info(
"Running blockwise segmentation with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "segment_worker.py")),
)
success = daisy.run_blockwise([task])

# give a second for the fist task to finish
Expand All @@ -137,5 +196,10 @@ def segment_blockwise(
*args,
**kwargs,
)
logger.info(
"Running blockwise relabeling with worker_file: ",
str(Path(Path(dacapo.blockwise.__file__).parent, "relabel_worker.py")),
)

return success and daisy.run_blockwise([task])
success = success and daisy.run_blockwise([task])
return success
Loading

0 comments on commit 990bc16

Please sign in to comment.