From 887f1ce869afe1b5cc158831bf75fa91d5a021c5 Mon Sep 17 00:00:00 2001 From: R Schwanhold Date: Wed, 30 Sep 2020 16:08:56 +0200 Subject: [PATCH] Allow to write a subset of a compressed wkw dataset (#241) * Allow to write a subset of a compressed wkw dataset * add parameter for compressed write * make chunks in 'for_each_chunk' bounded to their bounding box * reformat code * Update wkcuber/api/View.py Co-authored-by: Jonathan Striebel * reformat code * change expected Exception Co-authored-by: Jonathan Striebel --- tests/test_dataset.py | 155 ++++++++++++++++++++++++++++++++++++++ wkcuber/api/MagDataset.py | 4 +- wkcuber/api/View.py | 69 ++++++++++++++--- 3 files changed, 217 insertions(+), 11 deletions(-) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index c83bdcf2e..80ea55397 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -7,6 +7,8 @@ import numpy as np from shutil import rmtree, copytree +from wkw.wkw import WKWException + from wkcuber.api.Dataset import WKDataset, TiffDataset, TiledTiffDataset from os import path, makedirs @@ -14,6 +16,7 @@ from wkcuber.api.Properties.DatasetProperties import TiffProperties, WKProperties from wkcuber.api.TiffData.TiffMag import TiffReader from wkcuber.api.bounding_box import BoundingBox +from wkcuber.compress import compress_mag_inplace from wkcuber.mag import Mag from wkcuber.utils import get_executor_for_args @@ -1279,6 +1282,158 @@ def test_view_offsets(): pass +def test_writing_subset_of_compressed_data_multi_channel(): + delete_dir("./testoutput/compressed_data/") + + # create uncompressed dataset + write_data1 = (np.random.rand(3, 20, 40, 60) * 255).astype(np.uint8) + WKDataset.create( + os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) + ).add_layer("color", Layer.COLOR_TYPE, num_channels=3).add_mag( + "1", block_len=8, file_len=8 + ).write( + write_data1 + ) + + # compress data + compress_mag_inplace( + os.path.abspath("./testoutput/compressed_data/"), + layer_name="color", + mag=Mag("1"), + ) + + # open compressed dataset + compressed_mag = ( + WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1") + ) + + write_data2 = (np.random.rand(3, 10, 10, 10) * 255).astype(np.uint8) + compressed_mag.write( + offset=(10, 20, 30), data=write_data2, allow_compressed_write=True + ) + + np.array_equal( + write_data2, compressed_mag.read(offset=(10, 20, 30), size=(10, 10, 10)) + ) # the new data was written + np.array_equal( + write_data1[:, :10, :20, :30], + compressed_mag.read(offset=(0, 0, 0), size=(10, 20, 30)), + ) # the old data is still there + + +def test_writing_subset_of_compressed_data_single_channel(): + delete_dir("./testoutput/compressed_data/") + + # create uncompressed dataset + write_data1 = (np.random.rand(20, 40, 60) * 255).astype(np.uint8) + WKDataset.create( + os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) + ).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write( + write_data1 + ) + + # compress data + compress_mag_inplace( + os.path.abspath("./testoutput/compressed_data/"), + layer_name="color", + mag=Mag("1"), + ) + + # open compressed dataset + compressed_mag = ( + WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1") + ) + + write_data2 = (np.random.rand(10, 10, 10) * 255).astype(np.uint8) + compressed_mag.write( + offset=(10, 20, 30), data=write_data2, allow_compressed_write=True + ) + + np.array_equal( + write_data2, compressed_mag.read(offset=(10, 20, 30), size=(10, 10, 10)) + ) # the new data was written + np.array_equal( + write_data1[:10, :20, :30], + compressed_mag.read(offset=(0, 0, 0), size=(10, 20, 30)), + ) # the old data is still there + + +def test_writing_subset_of_compressed_data(): + delete_dir("./testoutput/compressed_data/") + + # create uncompressed dataset + WKDataset.create( + os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) + ).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write( + (np.random.rand(20, 40, 60) * 255).astype(np.uint8) + ) + + # compress data + compress_mag_inplace( + os.path.abspath("./testoutput/compressed_data/"), + layer_name="color", + mag=Mag("1"), + ) + + # open compressed dataset + compressed_mag = ( + WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1") + ) + + with pytest.raises(WKWException): + # calling 'write' with unaligned data on compressed data without setting 'allow_compressed_write=True' + compressed_mag.write( + offset=(10, 20, 30), + data=(np.random.rand(10, 10, 10) * 255).astype(np.uint8), + ) + + +def test_writing_subset_of_chunked_compressed_data(): + delete_dir("./testoutput/compressed_data/") + + # create uncompressed dataset + write_data1 = (np.random.rand(100, 200, 300) * 255).astype(np.uint8) + WKDataset.create( + os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) + ).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write( + write_data1 + ) + + # compress data + compress_mag_inplace( + os.path.abspath("./testoutput/compressed_data/"), + layer_name="color", + mag=Mag("1"), + ) + + # open compressed dataset + compressed_view = WKDataset("./testoutput/compressed_data").get_view( + "color", "1", size=(100, 200, 300), is_bounded=True + ) + + with pytest.raises(AssertionError): + # the aligned data (offset=(0,0,0), size=(128, 128, 128)) is NOT fully within the bounding box of the view + compressed_view.write( + relative_offset=(10, 20, 30), + data=(np.random.rand(90, 80, 70) * 255).astype(np.uint8), + allow_compressed_write=True, + ) + + # the aligned data (offset=(0,0,0), size=(64, 64, 64)) IS fully within the bounding box of the view + write_data2 = (np.random.rand(50, 40, 30) * 255).astype(np.uint8) + compressed_view.write( + relative_offset=(10, 20, 30), data=write_data2, allow_compressed_write=True + ) + + np.array_equal( + write_data2, compressed_view.read(offset=(10, 20, 30), size=(50, 40, 30)) + ) # the new data was written + np.array_equal( + write_data1[:10, :20, :30], + compressed_view.read(offset=(0, 0, 0), size=(10, 20, 30)), + ) # the old data is still there + + def test_add_symlink_layer(): delete_dir("./testoutput/wk_dataset_with_symlink") delete_dir("./testoutput/simple_wk_dataset_copy") diff --git a/wkcuber/api/MagDataset.py b/wkcuber/api/MagDataset.py index a5b50c4b9..37a77d484 100644 --- a/wkcuber/api/MagDataset.py +++ b/wkcuber/api/MagDataset.py @@ -26,9 +26,9 @@ def close(self): def read(self, size, offset=(0, 0, 0)) -> np.array: return self.view.read(size, offset) - def write(self, data, offset=(0, 0, 0)): + def write(self, data, offset=(0, 0, 0), allow_compressed_write=False): self._assert_valid_num_channels(data.shape) - self.view.write(data, offset) + self.view.write(data, offset, allow_compressed_write) layer_properties = self.layer.dataset.properties.data_layers[self.layer.name] current_offset_in_mag1 = layer_properties.get_bounding_box_offset() current_size_in_mag1 = layer_properties.get_bounding_box_size() diff --git a/wkcuber/api/View.py b/wkcuber/api/View.py index 7e1724d54..d987db296 100644 --- a/wkcuber/api/View.py +++ b/wkcuber/api/View.py @@ -1,7 +1,7 @@ import math import numpy as np -from wkw import Dataset +from wkw import Dataset, wkw from wkcuber.api.TiffData.TiffMag import TiffMag from wkcuber.api.bounding_box import BoundingBox @@ -36,7 +36,7 @@ def close(self): self.dataset = None self._is_opened = False - def write(self, data, relative_offset=(0, 0, 0)): + def write(self, data, relative_offset=(0, 0, 0), allow_compressed_write=False): was_opened = self._is_opened # assert the size of the parameter data is not in conflict with the attribute self.size assert_non_negative_offset(relative_offset) @@ -47,6 +47,9 @@ def write(self, data, relative_offset=(0, 0, 0)): sum(x) for x in zip(self.global_offset, relative_offset) ) + if self._is_compressed() and allow_compressed_write: + absolute_offset, data = self._handle_compressed_write(absolute_offset, data) + if not was_opened: self.open() @@ -95,7 +98,7 @@ def check_bounds(self, offset, size) -> bool: def assert_bounds(self, offset, size): if not self.check_bounds(offset, size): raise AssertionError( - f"Writing out of bounds: The passed parameter 'size' {size} exceeds the size of the current view ({self.size})" + f"Accessing data out of bounds: The passed parameter 'size' {size} exceeds the size of the current view ({self.size})" ) def for_each_chunk(self, work_on_chunk, job_args_per_chunk, chunk_size, executor): @@ -107,12 +110,9 @@ def for_each_chunk(self, work_on_chunk, job_args_per_chunk, chunk_size, executor chunk_size, chunk_size ): relative_offset = np.array(chunk.topleft) - np.array(self.global_offset) - job_args.append( - ( - self.get_view(size=chunk.size, relative_offset=relative_offset), - job_args_per_chunk, - ) - ) + view = self.get_view(size=chunk.size, relative_offset=relative_offset) + view.is_bounded = True + job_args.append((view, job_args_per_chunk)) # execute the work for each chunk wait_and_ensure_success(executor.map_to_futures(work_on_chunk, job_args)) @@ -120,6 +120,12 @@ def for_each_chunk(self, work_on_chunk, job_args_per_chunk, chunk_size, executor def _check_chunk_size(self, chunk_size): raise NotImplementedError + def _is_compressed(self): + return False + + def _handle_compressed_write(self, absolute_offset, data): + return absolute_offset, data + def __enter__(self): return self @@ -156,6 +162,51 @@ def _check_chunk_size(self, chunk_size): f"The passed parameter 'chunk_size' {chunk_size} must be a multiple of (32, 32, 32)." ) + def _is_compressed(self): + return ( + self.header.block_type == wkw.Header.BLOCK_TYPE_LZ4 + or self.header.block_type == wkw.Header.BLOCK_TYPE_LZ4HC + ) + + def _handle_compressed_write(self, absolute_offset, data): + # calculate aligned bounding box + file_bb = np.full(3, self.header.file_len * self.header.block_len) + absolute_offset_np = np.array(absolute_offset) + margin_to_top_left = absolute_offset_np % file_bb + aligned_offset = absolute_offset_np - margin_to_top_left + bottom_right = absolute_offset_np + np.array(data.shape[-3:]) + margin_to_bottom_right = file_bb - (bottom_right % file_bb) + aligned_bottom_right = bottom_right + margin_to_bottom_right + aligned_shape = aligned_bottom_right - aligned_offset + + if ( + tuple(aligned_offset) != absolute_offset + or tuple(aligned_shape) != data.shape[-3:] + ): + # the data is not aligned + # read the aligned bounding box + try: + aligned_data = self.read(offset=aligned_offset, size=aligned_shape) + except AssertionError as e: + raise AssertionError( + f"Writing compressed data failed. The compressed file is not fully inside the bounding box of the view (offset={self.global_offset}, size={self.size}). " + + str(e) + ) + index_slice = ( + slice(None, None), + *( + slice(start, end) + for start, end in zip( + margin_to_top_left, bottom_right - aligned_offset + ) + ), + ) + # overwrite the specified data + aligned_data[tuple(index_slice)] = data + return tuple(aligned_offset), aligned_data + else: + return absolute_offset, data + class TiffView(View): def open(self):