-
Notifications
You must be signed in to change notification settings - Fork 2
/
ZarrWrite.pyscro
150 lines (130 loc) · 6.24 KB
/
ZarrWrite.pyscro
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import zarr
from pathlib import Path
import os
_container_extension = '.zarr'
def split_path_at_container(path: str):
# check whether a path contains a valid file path to a container file, and if so which container format it is
result = None, None
pathobj = Path(path)
if pathobj.suffix==_container_extension:
result = [path, '']
else:
for parent in pathobj.parents:
if parent.suffix==_container_extension:
result = path.split(parent.suffix)
result[0] += parent.suffix
return result
class ZarrWrite(PyScriptObject):
def __init__(self):
self.data.valid_types = ['HxUniformScalarField3']
self.do_it = HxPortDoIt(self, 'write', 'Save Zarr')
self.output_dir = HxPortFilename(self, 'outputDir', 'Output Directory')
self.output_dir.mode = HxPortFilename.LOAD_DIRECTORY
self.container = None
self.dataset = None
self.container_path = None
self.dataset_path = None
self.slices = None
def update(self):
if self.output_dir.is_new and self.output_dir.filenames is not None:
if ".zarr" in self.output_dir.filenames:
self.container_path, self.dataset_path = split_path_at_container(self.output_dir.filenames)
else:
dir_path = self.output_dir.filenames
container_name = 'amira_export'
self.container_path = self.if_name_exists(dir_path, container_name, 0)
self.dataset_path = ''
self.container = self.access_container(mode='a')
self.dataset = self.container[self.dataset_path]
pass
def access_container(self, mode):
store_path = zarr.NestedDirectoryStore(self.container_path)
container = zarr.open(store=store_path, mode=mode)
return container
def compute(self):
if not self.do_it.was_hit:
return
if self.data.source() is None:
return
# Slices and array have to be transposed b/c amira uses x,y,z axis ordering but we use z,y,x for
# zarr
self.slices = self.bbox_to_slices()
output = self.data.source().get_array().T
ds_name = self.data.source().name.strip('-')
if isinstance(self.dataset, zarr.Group):
print('Saving {0} to {1}'.format(ds_name, self.dataset.name))
self.dataset[ds_name] = output
ds_name = self.data.source().name.strip('-')
self.add_multiscales_metadata(self.dataset, ds_name)
print("Added voxel size and offset attributes")
print('Save complete')
elif isinstance(self.dataset, zarr.Array):
if self.dataset.shape==output.shape:
print('Preparing to save {0} to {1}'.format(output, self.dataset))
parent_group = self.access_parent(self.dataset)
ds_name = self.dataset.name
parent_group[ds_name]=output
self.add_multiscales_metadata(parent_group, ds_name)
print("Added voxel size and offset attributes")
print('Save complete')
else:
hx_message.error(message="You're trying to store the amira array in the dataset with different dimensions")
def bbox_to_slices(self):
# convert the bounding box of the input data (real units) to a tuple of slices (indices)
shape = self.data.source().get_array().shape
bbox_starts, bbox_stops = self.data.source().bounding_box
# figure out the sampling resolution by dividing the extent of the data by its shape for each axis
rscale = tuple((bsto - bsta) / (s - 1) for bsto, bsta, s in zip(bbox_stops, bbox_starts, shape))
origins = tuple(int(bsta / r) for bsta, r in zip(bbox_starts, rscale))
slices = tuple(slice(o, o + s) for o,s in zip(origins, shape))
return slices
def separate_store_path(self, store, path):
new_store, path_prefix = os.path.split(store)
if ".zarr" in path_prefix or ".n5" in path_prefix:
return store, path
return self.separate_store_path(new_store, os.path.join(path_prefix, path).replace("\\","/"))
def access_parent(self, node):
store_path, node_path = self.separate_store_path(node.store.path, node.path)
if node_path == "":
raise RuntimeError(
f"{node.name} is in the root group of the {node.store.path} store."
)
else:
return zarr.open(store=store_path, path=os.path.split(node_path)[0], mode="a")
def if_name_exists(self, path, zarr_name, copy_num):
if copy_num == 0:
zarr_path=os.path.join(path, f"{zarr_name}.zarr").replace("\\","/")
else:
zarr_path= os.path.join(path,f"{zarr_name}_({copy_num}).zarr").replace("\\","/")
if not os.path.exists(zarr_path):
return zarr_path
else:
return self.if_name_exists(path, zarr_name, copy_num+1)
def add_multiscales_metadata(self, z_group, ds_name):
# default order - z, y, x
axes = ['z', 'y', 'x']
#default units - nm
unit = 'nanometer'
# offset values, z, y, x order
offset = self.data.source().ports.PhysicalSize.text.split('from')[1].strip().split(', ')[::-1]
# voxel size, z, y, x order
voxel_size = self.data.source().ports.VoxelSize.text.split(" x ")[::-1]
z_attrs: dict = {"multiscales": [{}]}
z_attrs["multiscales"][0]["axes"] = [
{"name": axis, "type": "space", "unit": unit} for axis in axes
]
z_attrs["multiscales"][0]["coordinateTransformations"] = [
{"scale": [1.0, 1.0, 1.0], "type": "scale"}
]
z_attrs["multiscales"][0]["datasets"] = [
{
"coordinateTransformations": [
{"scale": [float(i) for i in voxel_size], "type": "scale"},
{"translation": [float(i) for i in offset], "type": "translation"},
],
"path": ds_name,
}
]
z_attrs["multiscales"][0]["name"] = ""
z_attrs["multiscales"][0]["version"] = "0.4"
z_group.attrs.update(z_attrs)