Skip to content

Commit

Permalink
Add mazepa hooks for SpatialFile setup and post-process.
Browse files Browse the repository at this point in the history
Add build_spatial_file method.

Rename SpatialFile to AnnotationLayer.

Update unit test with renamed class.

Got unit test coverage back up to 100%.

Get tests back up to 100% again.
  • Loading branch information
JoeStrout committed Sep 5, 2024
1 parent 2cad36d commit 26e7ecc
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 14 deletions.
19 changes: 14 additions & 5 deletions tests/unit/db_annotations/test_precomp_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import pytest

from zetta_utils.db_annotations import precomp_annotations
from zetta_utils.db_annotations.precomp_annotations import LineAnnotation, SpatialFile
from zetta_utils.db_annotations.precomp_annotations import (
AnnotationLayer,
LineAnnotation,
)
from zetta_utils.geometry.vec import Vec3D
from zetta_utils.layer.volumetric.index import VolumetricIndex

Expand All @@ -26,19 +29,19 @@ def test_round_trip():

index = VolumetricIndex.from_coords([0, 0, 0], [2000, 2000, 600], Vec3D(10, 10, 40))

sf = SpatialFile(file_dir, index)
sf = AnnotationLayer(file_dir, index)
assert sf.chunk_sizes == [(2000, 2000, 600)]

chunk_sizes = [[2000, 2000, 600], [1000, 1000, 600], [500, 500, 300]]
sf = SpatialFile(file_dir, index, chunk_sizes)
sf = AnnotationLayer(file_dir, index, chunk_sizes)
os.makedirs(os.path.join(file_dir, "spatial0", "junkforcodecoverage"))
sf.clear()
sf.write_annotations([]) # (does nothing)
sf.write_annotations(lines)
sf.post_process()

# Now create a *new* SpatialFile, given just the directory.
sf = SpatialFile(file_dir)
# Now create a *new* AnnotationLayer, given just the directory.
sf = AnnotationLayer(file_dir)
assert sf.index == index
assert sf.chunk_sizes == chunk_sizes

Expand Down Expand Up @@ -67,8 +70,14 @@ def test_edge_cases():
# pylint: disable=use-implicit-booleaness-not-comparison
assert precomp_annotations.read_lines("/dev/null") == []

assert precomp_annotations.read_info("/dev/null") == (None, None, None, None)

assert precomp_annotations.path_join("gs://foo/", "bar") == "gs://foo/bar"

index = VolumetricIndex.from_coords((0, 0, 0), (10, 10, 10), (1, 1, 1))
assert not AnnotationLayer("/dev/null", index).exists()
assert not AnnotationLayer("/dev/null/subdir", index).exists()


if __name__ == "__main__":
test_round_trip()
Expand Down
150 changes: 141 additions & 9 deletions zetta_utils/db_annotations/precomp_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
from itertools import product
from math import ceil
from random import shuffle
from typing import IO, Optional, Sequence
from typing import IO, Literal, Optional, Sequence

from cloudfiles import CloudFile, CloudFiles

from zetta_utils import builder, log
from zetta_utils import builder, log, mazepa
from zetta_utils.geometry.vec import Vec3D
from zetta_utils.layer.volumetric.index import VolumetricIndex

Expand All @@ -46,7 +46,7 @@ def path_join(*paths: str):
if not paths:
raise ValueError("At least one path is required")

if not is_local_filesystem(paths[0]):
if not is_local_filesystem(paths[0]): # pragma: no cover
# Join paths using "/" for GCS or other URL-like paths
cleaned_paths = [path.strip("/") for path in paths]
return "/".join(cleaned_paths)
Expand Down Expand Up @@ -282,9 +282,21 @@ def parse_info(info_json):


def read_info(dir_path):
"""
Read the info file within the given directory, and return:
dimensions (dict), lower_bound, upper_bound, spatial_data (tuple of SpatialEntry)
If the file is empty or does not exist, return (None, None, None, None)
"""
file_path = path_join(dir_path, "info") # (note: not info.json as you would expect)
data = read_bytes(file_path).decode("utf-8")
return parse_info(data)
try:
data = read_bytes(file_path)
except NotADirectoryError:
data = None
if data is None or len(data) == 0:
return (None, None, None, None)
return parse_info(data.decode("utf-8"))


def read_data(dir_path, spatial_entry):
Expand Down Expand Up @@ -347,8 +359,8 @@ def subdivide(data, bounds: VolumetricIndex, chunk_sizes, write_to_dir=None, lev
return spatial_entries


@builder.register("SpatialFile")
class SpatialFile:
@builder.register("AnnotationLayer")
class AnnotationLayer:
"""
This class represents a spatial (precomputed annotation) file. It knows its data
bounds, and how that is broken up into chunks (possibly over several levels).
Expand All @@ -369,7 +381,7 @@ def __init__(
chunk_sizes: Optional[Sequence[Sequence[int]]] = None,
):
"""
Initialize a SpatialFile.
Initialize an AnnotationLayer.
:param path: local file or GS path of root of file hierarchy
:param index: bounding box and resolution defining volume containing the data
Expand All @@ -384,6 +396,8 @@ def __init__(
assert path, "path parameter is required"
if index is None:
dims, lower_bound, upper_bound, spatial_entries = read_info(path)
if dims is None:
raise ValueError("index is required when file does not exist") # pragma: no cover
resolution = []
for i in [0, 1, 2]:
numAndUnit = dims["xyz"[i]]
Expand All @@ -402,7 +416,21 @@ def __init__(
self.chunk_sizes = chunk_sizes

def __repr__(self):
return f"SpatialFile(path={self.path}, index={self.index}, chunk_sizes={self.chunk_sizes})"
return (
f"AnnotationLayer(path='{self.path}', index={self.index}, "
f"chunk_sizes={self.chunk_sizes})"
)

def exists(self) -> bool:
"""
Return whether this spatial file (more specifically, its info file) already exists.
"""
path = path_join(self.path, "info")
try:
data = read_bytes(path)
except NotADirectoryError:
data = None
return data is not None and len(data) > 0

def clear(self):
"""
Expand Down Expand Up @@ -549,3 +577,107 @@ def post_process(self):

# rewrite the info file, with the updated spatial entries
self.write_info_file(spatial_entries)


@builder.register("build_annotation_layer")
def build_annotation_layer( # pylint: disable=too-many-locals, too-many-branches
path: str,
resolution: Sequence[float] | None = None,
dataset_size: Sequence[int] | None = None,
voxel_offset: Sequence[int] | None = None,
index: VolumetricIndex | None = None,
chunk_sizes: Sequence[Sequence[int]] | None = None,
mode: Literal["read", "write", "replace", "update"] = "write",
) -> AnnotationLayer: # pragma: no cover # trivial conditional, delegation only
"""Build an AnnotationLayer (spatially indexed annotations in precomputed file format).
:param path: Path to the precomputed file (directory).
:param resolution: (x, y, z) size of one voxel, in nm.
:dataset_size: Precomputed dataset size (in voxels) for all scales.
:param voxel_offset: start of the dataset volume (in voxels) for all scales.
:index: VolumetricIndex indicating dataset size and resolution. Note that
for new files, you must provide either (resolution, dataset_size, voxel_offset)
or index, but not both. For existing files, all these are optional.
:chunk_sizes: Chunk sizes for spatial index; defaults to a single chunk for
new files (or the existing chunk structure for existing files).
:mode: How the file should be created or opened:
"read": for reading only; throws error if file does not exist.
"write": for writing; throws error if file exists.
"replace": for writing; if file exists, it is cleared of all data.
"update": for writing additional data; throws error if file does not exist.
"""
dims, lower_bound, upper_bound, spatial_entries = read_info(path)
file_exists = spatial_entries is not None
file_resolution = []
file_index = None
file_chunk_sizes = []
if file_exists:
for i in [0, 1, 2]:
numAndUnit = dims["xyz"[i]]
assert numAndUnit[1] == "nm", "Only dimensions in 'nm' are supported for reading"
file_resolution.append(numAndUnit[0])
# pylint: disable=E1120
file_index = VolumetricIndex.from_coords(lower_bound, upper_bound, Vec3D(*file_resolution))
file_chunk_sizes = [se.chunk_size for se in spatial_entries]

if mode in ("read", "update") and not file_exists:
raise IOError(
f"AnnotationLayer built with mode {mode}, but file does not exist (path: {path})"
)
if mode == "write" and file_exists:
raise IOError(
f"AnnotationLayer built with mode {mode}, but file already exists (path: {path})"
)

if index is None:
if mode == "write" or (mode == "replace" and not file_exists):
if resolution is None:
raise ValueError("when `index` is not provided, `resolution` is required")
if dataset_size is None:
raise ValueError("when `index` is not provided, `dataset_size` is required")
if voxel_offset is None:
raise ValueError("when `index` is not provided, `voxel_offset` is required")
if len(resolution) != 3:
raise ValueError(f"`resolution` needs 3 elements, not {len(resolution)}")
if len(dataset_size) != 3:
raise ValueError(f"`dataset_size` needs 3 elements, not {len(dataset_size)}")
if len(voxel_offset) != 3:
raise ValueError(f"`dataset_size` needs 3 elements, not {len(voxel_offset)}")
end_coord = tuple(a + b for a, b in zip(voxel_offset, dataset_size))
index = VolumetricIndex.from_coords(voxel_offset, end_coord, resolution)
else:
if resolution is not None:
raise ValueError("providing both `index` and `resolution` is invalid")
if dataset_size is not None:
raise ValueError("providing both `index` and `dataset_size` is invalid")
if voxel_offset is not None:
raise ValueError("providing both `index` and `voxel_offset` is invalid")

if mode == "update":
if index is not None and index != file_index:
raise ValueError(
f'opened file for "update" with index {index}, '
f"but existing file index is {file_index}"
)
if chunk_sizes is not None and chunk_sizes != file_chunk_sizes:
raise ValueError(
f'opened file for "update" with chunk_sizes {chunk_sizes}, '
f"but existing file chunk_sizes is {file_chunk_sizes}"
)

sf = AnnotationLayer(path, index, chunk_sizes)
if mode in ("write", "replace"):
sf.clear()
return sf


@mazepa.taskable_operation
def post_process_annotation_layer_op(target: AnnotationLayer): # pragma: no cover
print(f"Post-processing: {target.path}")
target.post_process()


@builder.register("post_process_annotation_layer_flow")
@mazepa.flow_schema
def post_process_annotation_layer_flow(target: AnnotationLayer): # pragma: no cover
yield post_process_annotation_layer_op.make_task(target)

0 comments on commit 26e7ecc

Please sign in to comment.