diff --git a/dacapo/experiments/datasplits/datasets/arrays/binarize_array_config.py b/dacapo/experiments/datasplits/datasets/arrays/binarize_array_config.py index 570739f63..4df39abb4 100644 --- a/dacapo/experiments/datasplits/datasets/arrays/binarize_array_config.py +++ b/dacapo/experiments/datasplits/datasets/arrays/binarize_array_config.py @@ -55,12 +55,13 @@ def array(self, mode="r") -> Array: assert num_channels is None, "Input labels cannot have a channel dimension" def group_array(data): - out = da.zeros((len(self.groupings), *array.physical_shape), dtype=np.uint8) - for i, (_, group_ids) in enumerate(self.groupings): - if len(group_ids) == 0: - out[i] = data != self.background - else: - out[i] = da.isin(data, group_ids) + groups = [ + da.isin(data, group_ids) + if len(group_ids) > 0 + else data != self.background + for _, group_ids in self.groupings + ] + out = da.stack(groups, axis=0) return out data = group_array(array.data) diff --git a/dacapo/experiments/datasplits/datasets/arrays/concat_array_config.py b/dacapo/experiments/datasplits/datasets/arrays/concat_array_config.py index 4de730b18..1de31ccdf 100644 --- a/dacapo/experiments/datasplits/datasets/arrays/concat_array_config.py +++ b/dacapo/experiments/datasplits/datasets/arrays/concat_array_config.py @@ -4,7 +4,6 @@ from typing import List, Dict, Optional from funlib.persistence import Array -import numpy as np import dask.array as da @@ -45,18 +44,15 @@ class ConcatArrayConfig(ArrayConfig): def array(self, mode: str = "r") -> Array: arrays = [config.array(mode) for _, config in self.source_array_configs.items()] + out_data = da.stack([array.data for array in arrays], axis=0) out_array = Array( - da.zeros(len(arrays), *arrays[0].physical_shape, dtype=arrays[0].dtype), + out_data, offset=arrays[0].offset, voxel_size=arrays[0].voxel_size, axis_names=["c^"] + arrays[0].axis_names, units=arrays[0].units, ) - def set_channels(data): - for i, array in enumerate(arrays): - data[i] = array.data[:] - return data - - out_array.lazy_op(set_channels) + # callable lazy op so funlib.persistence doesn't try to recoginize this data as writable + out_array.lazy_op(lambda data: data) return out_array diff --git a/dacapo/experiments/datasplits/datasets/arrays/logical_or_array_config.py b/dacapo/experiments/datasplits/datasets/arrays/logical_or_array_config.py index a9cde5daa..432e70d3f 100644 --- a/dacapo/experiments/datasplits/datasets/arrays/logical_or_array_config.py +++ b/dacapo/experiments/datasplits/datasets/arrays/logical_or_array_config.py @@ -30,14 +30,14 @@ def array(self, mode: str = "r") -> Array: assert num_channels_from_array(array) is not None out_array = Array( - da.zeros(*array.physical_shape, dtype=array.dtype), + da.zeros(array.physical_shape, dtype=array.dtype), offset=array.offset, voxel_size=array.voxel_size, axis_names=array.axis_names[1:], units=array.units, ) - out_array.data = da.maximum(array.data, axis=0) + out_array.data = da.max(array.data, axis=0) # mark data as non-writable out_array.lazy_op(lambda data: data) diff --git a/dacapo/experiments/datasplits/datasets/arrays/resampled_array_config.py b/dacapo/experiments/datasplits/datasets/arrays/resampled_array_config.py index 7a03f89eb..cae500b44 100644 --- a/dacapo/experiments/datasplits/datasets/arrays/resampled_array_config.py +++ b/dacapo/experiments/datasplits/datasets/arrays/resampled_array_config.py @@ -5,6 +5,27 @@ from funlib.geometry import Coordinate from funlib.persistence import Array +from xarray_multiscale.multiscale import downscale_dask +from xarray_multiscale import windowed_mean +import numpy as np +import dask.array as da +from skimage.transform import rescale + +from typing import Sequence + + +def adjust_shape(array: da.Array, scale_factors: Sequence[int]) -> da.Array: + """ + Crop array to a shape that is a multiple of the scale factors. + This allows for clean downsampling. + """ + misalignment = np.any(np.mod(array.shape, scale_factors)) + if misalignment: + new_shape = np.subtract(array.shape, np.mod(array.shape, scale_factors)) + slices = tuple(slice(0, s) for s in new_shape) + array = array[slices] + return array + @attr.s class ResampledArrayConfig(ArrayConfig): @@ -27,17 +48,77 @@ class ResampledArrayConfig(ArrayConfig): metadata={"help_text": "The Array that you want to upsample or downsample."} ) - upsample: Coordinate = attr.ib( + _upsample: Coordinate = attr.ib( metadata={"help_text": "The amount by which to upsample!"} ) - downsample: Coordinate = attr.ib( + _downsample: Coordinate = attr.ib( metadata={"help_text": "The amount by which to downsample!"} ) interp_order: bool = attr.ib( metadata={"help_text": "The order of the interpolation!"} ) + def preprocess(self, array: Array) -> Array: + """ + Preprocess an array by resampling it to the desired voxel size. + """ + if self.downsample is not None: + downsample = list(self.downsample) + for i, axis_name in enumerate(array.axis_names): + if "^" in axis_name: + downsample = downsample[:i] + [1] + downsample[i:] + return Array( + data=downscale_dask( + adjust_shape(array.data, downsample), + windowed_mean, + scale_factors=downsample, + ), + offset=array.offset, + voxel_size=array.voxel_size * self.downsample, + axis_names=array.axis_names, + units=array.units, + ) + elif self.upsample is not None: + upsample = list(self.upsample) + for i, axis_name in enumerate(array.axis_names): + if "^" in axis_name: + upsample = upsample[:i] + [1] + upsample[i:] + + depth = [int(x > 1) for x in upsample] + trim_slicing = tuple( + slice(d * s, (-d * s)) if d > 1 else slice(None) + for d, s in zip(depth, upsample) + ) + + rescaled_arr = da.map_overlap( + lambda x: rescale( + x, upsample, order=int(self.interp_order), preserve_range=True + )[trim_slicing], + array.data, + depth=depth, + boundary="reflect", + trim=False, + dtype=array.data.dtype, + chunks=tuple(c * u for c, u in zip(array.data.chunksize, upsample)), + ) + + return Array( + data=rescaled_arr, + offset=array.offset, + voxel_size=array.voxel_size / self.upsample, + axis_names=array.axis_names, + units=array.units, + ) + + @property + def upsample(self) -> Coordinate: + return Coordinate(self._upsample) + + @property + def downsample(self) -> Coordinate: + return Coordinate(self._downsample) + def array(self, mode: str = "r") -> Array: - # This is non trivial. We want to upsample or downsample the source - # array lazily. Not entirely sure how to do this with dask arrays. - raise NotImplementedError() + source_array = self.source_array_config.array(mode) + + return self.preprocess(source_array) diff --git a/dacapo/experiments/tasks/predictors/hot_distance_predictor.py b/dacapo/experiments/tasks/predictors/hot_distance_predictor.py index f2ec4f874..ec27e3346 100644 --- a/dacapo/experiments/tasks/predictors/hot_distance_predictor.py +++ b/dacapo/experiments/tasks/predictors/hot_distance_predictor.py @@ -1,7 +1,6 @@ from dacapo.experiments.arraytypes.probabilities import ProbabilityArray from .predictor import Predictor from dacapo.experiments import Model -from dacapo.experiments.arraytypes import DistanceArray from dacapo.tmp import np_to_funlib_array from dacapo.utils.balance_weights import balance_weights @@ -394,6 +393,7 @@ def __find_boundaries(self, labels): # bound.: 00000001000100000001000 2n - 1 logger.debug(f"computing boundaries for {labels.shape}") + labels = labels.astype(np.uint8) dims = len(labels.shape) in_shape = labels.shape diff --git a/dacapo/experiments/trainers/gunpowder_trainer.py b/dacapo/experiments/trainers/gunpowder_trainer.py index 507151ad7..15da00a06 100644 --- a/dacapo/experiments/trainers/gunpowder_trainer.py +++ b/dacapo/experiments/trainers/gunpowder_trainer.py @@ -361,7 +361,9 @@ def iterate(self, num_iterations, model, optimizer, device): ), } if mask is not None: - snapshot_arrays["volumes/mask"] = mask + snapshot_arrays["volumes/mask"] = np_to_funlib_array( + mask[0], offset=target.offset, voxel_size=target.voxel_size + ) logger.warning( f"Saving Snapshot. Iteration: {iteration}, " f"Loss: {loss.detach().cpu().numpy().item()}!" diff --git a/dacapo/tmp.py b/dacapo/tmp.py index 672745c90..6c455aff1 100644 --- a/dacapo/tmp.py +++ b/dacapo/tmp.py @@ -71,7 +71,7 @@ def create_from_identifier( return prepare_ds( out_path, shape=(*list_num_channels, *roi.shape / voxel_size), - offset=roi.offset / voxel_size, + offset=roi.offset, voxel_size=voxel_size, axis_names=axis_names, dtype=dtype, diff --git a/dacapo/utils/pipeline.py b/dacapo/utils/pipeline.py index 99f823eb9..9daef7c1f 100644 --- a/dacapo/utils/pipeline.py +++ b/dacapo/utils/pipeline.py @@ -83,45 +83,44 @@ class MakeRaw(gp.BatchFilter): process: Generate the raw image from the labels. """ - class Pipeline: - def __init__( - self, - raw, - labels, - gaussian_noise_args: Iterable = (0.5, 0.1), - gaussian_noise_lim: float = 0.3, - gaussian_blur_args: Iterable = (0.5, 1.5), - membrane_like=True, - membrane_size=3, - inside_value=0.5, - ): - """ - Initialize the Pipeline object. - - Args: - raw: The raw data. - labels: The labels data. - gaussian_noise_args: Tuple of two floats representing the mean and standard deviation - of the Gaussian noise to be added to the data. Default is (0.5, 0.1). - gaussian_noise_lim: The limit of the Gaussian noise. Default is 0.3. - gaussian_blur_args: Tuple of two floats representing the mean and standard deviation - of the Gaussian blur to be applied to the data. Default is (0.5, 1.5). - membrane_like: Boolean indicating whether to apply membrane-like transformation to the data. - Default is True. - membrane_size: The size of the membrane. Default is 3. - inside_value: The value to be assigned to the inside of the membrane. Default is 0.5. - Examples: - >>> Pipeline(raw="RAW", labels="LABELS", gaussian_noise_args=(0.5, 0.1), gaussian_noise_lim=0.3, - >>> gaussian_blur_args=(0.5, 1.5), membrane_like=True, membrane_size=3, inside_value=0.5) - """ - self.raw = raw - self.labels = labels - self.gaussian_noise_args = gaussian_noise_args - self.gaussian_noise_lim = gaussian_noise_lim - self.gaussian_blur_args = gaussian_blur_args - self.membrane_like = membrane_like - self.membrane_size = membrane_size - self.inside_value = inside_value + def __init__( + self, + raw, + labels, + gaussian_noise_args: Iterable = (0.5, 0.1), + gaussian_noise_lim: float = 0.3, + gaussian_blur_args: Iterable = (0.5, 1.5), + membrane_like=True, + membrane_size=3, + inside_value=0.5, + ): + """ + Initialize the Pipeline object. + + Args: + raw: The raw data. + labels: The labels data. + gaussian_noise_args: Tuple of two floats representing the mean and standard deviation + of the Gaussian noise to be added to the data. Default is (0.5, 0.1). + gaussian_noise_lim: The limit of the Gaussian noise. Default is 0.3. + gaussian_blur_args: Tuple of two floats representing the mean and standard deviation + of the Gaussian blur to be applied to the data. Default is (0.5, 1.5). + membrane_like: Boolean indicating whether to apply membrane-like transformation to the data. + Default is True. + membrane_size: The size of the membrane. Default is 3. + inside_value: The value to be assigned to the inside of the membrane. Default is 0.5. + Examples: + >>> Pipeline(raw="RAW", labels="LABELS", gaussian_noise_args=(0.5, 0.1), gaussian_noise_lim=0.3, + >>> gaussian_blur_args=(0.5, 1.5), membrane_like=True, membrane_size=3, inside_value=0.5) + """ + self.raw = raw + self.labels = labels + self.gaussian_noise_args = gaussian_noise_args + self.gaussian_noise_lim = gaussian_noise_lim + self.gaussian_blur_args = gaussian_blur_args + self.membrane_like = membrane_like + self.membrane_size = membrane_size + self.inside_value = inside_value def setup(self): """ diff --git a/pyproject.toml b/pyproject.toml index 730611ad5..c1bc82fb8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ dependencies = [ "boto3", "matplotlib", "funlib.persistence @ git+https://github.com/funkelab/funlib.persistence.git@ome-ngff" + "xarray-multiscale", ] # extras @@ -201,6 +202,7 @@ module = [ "napari.*", "empanada.*", "IPython.*", + "xarray_multiscale.*" ] ignore_missing_imports = true diff --git a/tests/components/test_gp_arraysource.py b/tests/components/test_gp_arraysource.py index 58a4b23ba..68a5b5ae2 100644 --- a/tests/components/test_gp_arraysource.py +++ b/tests/components/test_gp_arraysource.py @@ -30,6 +30,5 @@ def test_gp_dacapo_array_source(array_config): batch = source_node.request_batch(request) data = batch[key].data if data.dtype == bool: - raise ValueError("Data should not be bools") data = data.astype(np.uint8) assert (data - array[array.roi]).sum() == 0 diff --git a/tests/components/test_preprocessing.py b/tests/components/test_preprocessing.py new file mode 100644 index 000000000..14aa7d1df --- /dev/null +++ b/tests/components/test_preprocessing.py @@ -0,0 +1,32 @@ +from dacapo.experiments.datasplits.datasets.arrays.resampled_array_config import ( + ResampledArrayConfig, +) + +import numpy as np +from funlib.persistence import Array +from funlib.geometry import Coordinate + + +def test_resample(): + # test downsampling arrays with shape 10 and 11 by a factor of 2 to test croping works + for top in [11, 12]: + arr = Array(np.array(np.arange(1, top)), offset=(0,), voxel_size=(3,)) + resample_config = ResampledArrayConfig( + "test_resample", None, upsample=None, downsample=(2,), interp_order=1 + ) + resampled = resample_config.preprocess(arr) + assert resampled.voxel_size == Coordinate((6,)) + assert resampled.shape == (5,) + assert np.allclose(resampled[:], np.array([1.5, 3.5, 5.5, 7.5, 9.5])) + + # test 2D array + arr = Array( + np.array(np.arange(1, 11).reshape(5, 2).T), offset=(0, 0), voxel_size=(3, 3) + ) + resample_config = ResampledArrayConfig( + "test_resample", None, upsample=None, downsample=(2, 1), interp_order=1 + ) + resampled = resample_config.preprocess(arr) + assert resampled.voxel_size == Coordinate(6, 3) + assert resampled.shape == (1, 5) + assert np.allclose(resampled[:], np.array([[1.5, 3.5, 5.5, 7.5, 9.5]])) diff --git a/tests/operations/helpers.py b/tests/operations/helpers.py index 74fb43208..d4be3f708 100644 --- a/tests/operations/helpers.py +++ b/tests/operations/helpers.py @@ -2,6 +2,7 @@ from funlib.persistence import prepare_ds from funlib.geometry import Coordinate +from dacapo.experiments.trainers import GunpowderTrainerConfig from dacapo.experiments.datasplits import SimpleDataSplitConfig from dacapo.experiments.tasks import ( DistanceTaskConfig, @@ -13,6 +14,19 @@ from pathlib import Path +def build_test_train_config(multiprocessing: bool): + """ + Builds the simplest possible trainer given the parameters. + """ + return GunpowderTrainerConfig( + name="test_trainer", + batch_size=1, + learning_rate=0.0001, + num_data_fetchers=1 + multiprocessing, + snapshot_interval=1, + ) + + def build_test_data_config( tmpdir: Path, data_dims: int, channels: bool, upsample: bool, task_type: str ): @@ -104,9 +118,7 @@ def build_test_architecture_config( data_dims: int, architecture_dims: int, channels: bool, - batch_norm: bool, upsample: bool, - use_attention: bool, padding: str, ): """ @@ -160,7 +172,5 @@ def build_test_architecture_config( kernel_size_up=kernel_size_up, constant_upsample=True, upsample_factors=upsample_factors, - batch_norm=batch_norm, - use_attention=use_attention, padding=padding, ) diff --git a/tests/operations/test_mini.py b/tests/operations/test_mini.py index f50705538..57b25bdf2 100644 --- a/tests/operations/test_mini.py +++ b/tests/operations/test_mini.py @@ -1,16 +1,19 @@ from ..fixtures import * from .helpers import ( + build_test_train_config, build_test_data_config, build_test_task_config, build_test_architecture_config, ) +from dacapo.store.create_store import create_array_store from dacapo.experiments import Run from dacapo.train import train_run from dacapo.validate import validate_run +import zarr + import pytest -from pytest_lazy_fixtures import lf from dacapo.experiments.run_config import RunConfig @@ -22,27 +25,21 @@ @pytest.mark.parametrize("data_dims", [2, 3]) @pytest.mark.parametrize("channels", [True, False]) @pytest.mark.parametrize("task", ["distance", "onehot", "affs"]) -@pytest.mark.parametrize("trainer", [lf("gunpowder_trainer")]) @pytest.mark.parametrize("architecture_dims", [2, 3]) @pytest.mark.parametrize("upsample", [True, False]) -# @pytest.mark.parametrize("batch_norm", [True, False]) -@pytest.mark.parametrize("batch_norm", [False]) -# @pytest.mark.parametrize("use_attention", [True, False]) -@pytest.mark.parametrize("use_attention", [False]) @pytest.mark.parametrize("padding", ["valid", "same"]) @pytest.mark.parametrize("func", ["train", "validate"]) +@pytest.mark.parametrize("multiprocessing", [False]) def test_mini( tmpdir, data_dims, channels, task, - trainer, architecture_dims, - batch_norm, upsample, - use_attention, padding, func, + multiprocessing, ): # Invalid configurations: if data_dims == 2 and architecture_dims == 3: @@ -50,6 +47,8 @@ def test_mini( # TODO: maybe check that an appropriate warning is raised somewhere return + trainer_config = build_test_train_config(multiprocessing) + data_config = build_test_data_config( tmpdir, data_dims, @@ -62,9 +61,7 @@ def test_mini( data_dims, architecture_dims, channels, - batch_norm, upsample, - use_attention, padding, ) @@ -72,7 +69,7 @@ def test_mini( name=f"test_{func}", task_config=task_config, architecture_config=architecture_config, - trainer_config=trainer, + trainer_config=trainer_config, datasplit_config=data_config, repetition=0, num_iterations=1, @@ -81,5 +78,20 @@ def test_mini( if func == "train": train_run(run) + array_store = create_array_store() + snapshot_container = array_store.snapshot_container(run.name).container + assert snapshot_container.exists() + assert all( + x in zarr.open(snapshot_container) + for x in [ + "0/volumes/raw", + "0/volumes/gt", + "0/volumes/target", + "0/volumes/weight", + "0/volumes/prediction", + "0/volumes/gradients", + "0/volumes/mask", + ] + ) elif func == "validate": validate_run(run, 1)