Skip to content

Commit

Permalink
Allow to write a subset of a compressed wkw dataset (#241)
Browse files Browse the repository at this point in the history
* Allow to write a subset of a compressed wkw dataset

* add parameter for compressed write

* make chunks in 'for_each_chunk' bounded to their bounding box

* reformat code

* Update wkcuber/api/View.py

Co-authored-by: Jonathan Striebel <[email protected]>

* reformat code

* change expected Exception

Co-authored-by: Jonathan Striebel <[email protected]>
  • Loading branch information
rschwanhold and jstriebel authored Sep 30, 2020
1 parent e794219 commit 887f1ce
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 11 deletions.
155 changes: 155 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import numpy as np
from shutil import rmtree, copytree

from wkw.wkw import WKWException

from wkcuber.api.Dataset import WKDataset, TiffDataset, TiledTiffDataset
from os import path, makedirs

from wkcuber.api.Layer import Layer
from wkcuber.api.Properties.DatasetProperties import TiffProperties, WKProperties
from wkcuber.api.TiffData.TiffMag import TiffReader
from wkcuber.api.bounding_box import BoundingBox
from wkcuber.compress import compress_mag_inplace
from wkcuber.mag import Mag
from wkcuber.utils import get_executor_for_args

Expand Down Expand Up @@ -1279,6 +1282,158 @@ def test_view_offsets():
pass


def test_writing_subset_of_compressed_data_multi_channel():
delete_dir("./testoutput/compressed_data/")

# create uncompressed dataset
write_data1 = (np.random.rand(3, 20, 40, 60) * 255).astype(np.uint8)
WKDataset.create(
os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1)
).add_layer("color", Layer.COLOR_TYPE, num_channels=3).add_mag(
"1", block_len=8, file_len=8
).write(
write_data1
)

# compress data
compress_mag_inplace(
os.path.abspath("./testoutput/compressed_data/"),
layer_name="color",
mag=Mag("1"),
)

# open compressed dataset
compressed_mag = (
WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1")
)

write_data2 = (np.random.rand(3, 10, 10, 10) * 255).astype(np.uint8)
compressed_mag.write(
offset=(10, 20, 30), data=write_data2, allow_compressed_write=True
)

np.array_equal(
write_data2, compressed_mag.read(offset=(10, 20, 30), size=(10, 10, 10))
) # the new data was written
np.array_equal(
write_data1[:, :10, :20, :30],
compressed_mag.read(offset=(0, 0, 0), size=(10, 20, 30)),
) # the old data is still there


def test_writing_subset_of_compressed_data_single_channel():
delete_dir("./testoutput/compressed_data/")

# create uncompressed dataset
write_data1 = (np.random.rand(20, 40, 60) * 255).astype(np.uint8)
WKDataset.create(
os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1)
).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write(
write_data1
)

# compress data
compress_mag_inplace(
os.path.abspath("./testoutput/compressed_data/"),
layer_name="color",
mag=Mag("1"),
)

# open compressed dataset
compressed_mag = (
WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1")
)

write_data2 = (np.random.rand(10, 10, 10) * 255).astype(np.uint8)
compressed_mag.write(
offset=(10, 20, 30), data=write_data2, allow_compressed_write=True
)

np.array_equal(
write_data2, compressed_mag.read(offset=(10, 20, 30), size=(10, 10, 10))
) # the new data was written
np.array_equal(
write_data1[:10, :20, :30],
compressed_mag.read(offset=(0, 0, 0), size=(10, 20, 30)),
) # the old data is still there


def test_writing_subset_of_compressed_data():
delete_dir("./testoutput/compressed_data/")

# create uncompressed dataset
WKDataset.create(
os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1)
).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write(
(np.random.rand(20, 40, 60) * 255).astype(np.uint8)
)

# compress data
compress_mag_inplace(
os.path.abspath("./testoutput/compressed_data/"),
layer_name="color",
mag=Mag("1"),
)

# open compressed dataset
compressed_mag = (
WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1")
)

with pytest.raises(WKWException):
# calling 'write' with unaligned data on compressed data without setting 'allow_compressed_write=True'
compressed_mag.write(
offset=(10, 20, 30),
data=(np.random.rand(10, 10, 10) * 255).astype(np.uint8),
)


def test_writing_subset_of_chunked_compressed_data():
delete_dir("./testoutput/compressed_data/")

# create uncompressed dataset
write_data1 = (np.random.rand(100, 200, 300) * 255).astype(np.uint8)
WKDataset.create(
os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1)
).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write(
write_data1
)

# compress data
compress_mag_inplace(
os.path.abspath("./testoutput/compressed_data/"),
layer_name="color",
mag=Mag("1"),
)

# open compressed dataset
compressed_view = WKDataset("./testoutput/compressed_data").get_view(
"color", "1", size=(100, 200, 300), is_bounded=True
)

with pytest.raises(AssertionError):
# the aligned data (offset=(0,0,0), size=(128, 128, 128)) is NOT fully within the bounding box of the view
compressed_view.write(
relative_offset=(10, 20, 30),
data=(np.random.rand(90, 80, 70) * 255).astype(np.uint8),
allow_compressed_write=True,
)

# the aligned data (offset=(0,0,0), size=(64, 64, 64)) IS fully within the bounding box of the view
write_data2 = (np.random.rand(50, 40, 30) * 255).astype(np.uint8)
compressed_view.write(
relative_offset=(10, 20, 30), data=write_data2, allow_compressed_write=True
)

np.array_equal(
write_data2, compressed_view.read(offset=(10, 20, 30), size=(50, 40, 30))
) # the new data was written
np.array_equal(
write_data1[:10, :20, :30],
compressed_view.read(offset=(0, 0, 0), size=(10, 20, 30)),
) # the old data is still there


def test_add_symlink_layer():
delete_dir("./testoutput/wk_dataset_with_symlink")
delete_dir("./testoutput/simple_wk_dataset_copy")
Expand Down
4 changes: 2 additions & 2 deletions wkcuber/api/MagDataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def close(self):
def read(self, size, offset=(0, 0, 0)) -> np.array:
return self.view.read(size, offset)

def write(self, data, offset=(0, 0, 0)):
def write(self, data, offset=(0, 0, 0), allow_compressed_write=False):
self._assert_valid_num_channels(data.shape)
self.view.write(data, offset)
self.view.write(data, offset, allow_compressed_write)
layer_properties = self.layer.dataset.properties.data_layers[self.layer.name]
current_offset_in_mag1 = layer_properties.get_bounding_box_offset()
current_size_in_mag1 = layer_properties.get_bounding_box_size()
Expand Down
69 changes: 60 additions & 9 deletions wkcuber/api/View.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import math

import numpy as np
from wkw import Dataset
from wkw import Dataset, wkw

from wkcuber.api.TiffData.TiffMag import TiffMag
from wkcuber.api.bounding_box import BoundingBox
Expand Down Expand Up @@ -36,7 +36,7 @@ def close(self):
self.dataset = None
self._is_opened = False

def write(self, data, relative_offset=(0, 0, 0)):
def write(self, data, relative_offset=(0, 0, 0), allow_compressed_write=False):
was_opened = self._is_opened
# assert the size of the parameter data is not in conflict with the attribute self.size
assert_non_negative_offset(relative_offset)
Expand All @@ -47,6 +47,9 @@ def write(self, data, relative_offset=(0, 0, 0)):
sum(x) for x in zip(self.global_offset, relative_offset)
)

if self._is_compressed() and allow_compressed_write:
absolute_offset, data = self._handle_compressed_write(absolute_offset, data)

if not was_opened:
self.open()

Expand Down Expand Up @@ -95,7 +98,7 @@ def check_bounds(self, offset, size) -> bool:
def assert_bounds(self, offset, size):
if not self.check_bounds(offset, size):
raise AssertionError(
f"Writing out of bounds: The passed parameter 'size' {size} exceeds the size of the current view ({self.size})"
f"Accessing data out of bounds: The passed parameter 'size' {size} exceeds the size of the current view ({self.size})"
)

def for_each_chunk(self, work_on_chunk, job_args_per_chunk, chunk_size, executor):
Expand All @@ -107,19 +110,22 @@ def for_each_chunk(self, work_on_chunk, job_args_per_chunk, chunk_size, executor
chunk_size, chunk_size
):
relative_offset = np.array(chunk.topleft) - np.array(self.global_offset)
job_args.append(
(
self.get_view(size=chunk.size, relative_offset=relative_offset),
job_args_per_chunk,
)
)
view = self.get_view(size=chunk.size, relative_offset=relative_offset)
view.is_bounded = True
job_args.append((view, job_args_per_chunk))

# execute the work for each chunk
wait_and_ensure_success(executor.map_to_futures(work_on_chunk, job_args))

def _check_chunk_size(self, chunk_size):
raise NotImplementedError

def _is_compressed(self):
return False

def _handle_compressed_write(self, absolute_offset, data):
return absolute_offset, data

def __enter__(self):
return self

Expand Down Expand Up @@ -156,6 +162,51 @@ def _check_chunk_size(self, chunk_size):
f"The passed parameter 'chunk_size' {chunk_size} must be a multiple of (32, 32, 32)."
)

def _is_compressed(self):
return (
self.header.block_type == wkw.Header.BLOCK_TYPE_LZ4
or self.header.block_type == wkw.Header.BLOCK_TYPE_LZ4HC
)

def _handle_compressed_write(self, absolute_offset, data):
# calculate aligned bounding box
file_bb = np.full(3, self.header.file_len * self.header.block_len)
absolute_offset_np = np.array(absolute_offset)
margin_to_top_left = absolute_offset_np % file_bb
aligned_offset = absolute_offset_np - margin_to_top_left
bottom_right = absolute_offset_np + np.array(data.shape[-3:])
margin_to_bottom_right = file_bb - (bottom_right % file_bb)
aligned_bottom_right = bottom_right + margin_to_bottom_right
aligned_shape = aligned_bottom_right - aligned_offset

if (
tuple(aligned_offset) != absolute_offset
or tuple(aligned_shape) != data.shape[-3:]
):
# the data is not aligned
# read the aligned bounding box
try:
aligned_data = self.read(offset=aligned_offset, size=aligned_shape)
except AssertionError as e:
raise AssertionError(
f"Writing compressed data failed. The compressed file is not fully inside the bounding box of the view (offset={self.global_offset}, size={self.size}). "
+ str(e)
)
index_slice = (
slice(None, None),
*(
slice(start, end)
for start, end in zip(
margin_to_top_left, bottom_right - aligned_offset
)
),
)
# overwrite the specified data
aligned_data[tuple(index_slice)] = data
return tuple(aligned_offset), aligned_data
else:
return absolute_offset, data


class TiffView(View):
def open(self):
Expand Down

0 comments on commit 887f1ce

Please sign in to comment.