diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 5b17e2c2d..25806e824 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -5,7 +5,7 @@ import tempfile from os.path import dirname, join from pathlib import Path -from typing import Any, Tuple, cast, Generator +from typing import Any, Tuple, cast, Generator, Union import pytest @@ -47,9 +47,12 @@ expected_error_msg = "The test did not throw an exception even though it should. " +TESTDATA_DIR = Path("testdata") +TESTOUTPUT_DIR = Path("testoutput") -def delete_dir(relative_path: str) -> None: - if path.exists(relative_path) and path.isdir(relative_path): + +def delete_dir(relative_path: Path) -> None: + if relative_path.exists() and relative_path.is_dir(): rmtree(relative_path) @@ -165,32 +168,32 @@ def get_multichanneled_data(dtype: type) -> np.ndarray: def test_create_wk_dataset_with_layer_and_mag() -> None: - delete_dir("./testoutput/wk_dataset") + delete_dir(TESTOUTPUT_DIR / "wk_dataset") - ds = WKDataset.create("./testoutput/wk_dataset", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "wk_dataset", scale=(1, 1, 1)) ds.add_layer("color", "color") ds.get_layer("color").add_mag("1") ds.get_layer("color").add_mag("2-2-1") - assert path.exists("./testoutput/wk_dataset/color/1") - assert path.exists("./testoutput/wk_dataset/color/2-2-1") + assert (TESTOUTPUT_DIR / "wk_dataset" / "color" / "1").exists() + assert (TESTOUTPUT_DIR / "wk_dataset" / "color" / "2-2-1").exists() assert len(ds.properties.data_layers) == 1 assert len(ds.properties.data_layers["color"].wkw_magnifications) == 2 def test_create_wk_dataset_with_explicit_header_fields() -> None: - delete_dir("./testoutput/wk_dataset_advanced") + delete_dir(TESTOUTPUT_DIR / "wk_dataset_advanced") - ds = WKDataset.create("./testoutput/wk_dataset_advanced", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "wk_dataset_advanced", scale=(1, 1, 1)) ds.add_layer("color", Layer.COLOR_TYPE, dtype_per_layer="uint48", num_channels=3) ds.get_layer("color").add_mag("1", block_len=64, file_len=64) ds.get_layer("color").add_mag("2-2-1") - assert path.exists("./testoutput/wk_dataset_advanced/color/1") - assert path.exists("./testoutput/wk_dataset_advanced/color/2-2-1") + assert (TESTOUTPUT_DIR / "wk_dataset_advanced" / "color" / "1").exists() + assert (TESTOUTPUT_DIR / "wk_dataset_advanced" / "color" / "2-2-1").exists() assert len(ds.properties.data_layers) == 1 assert len(ds.properties.data_layers["color"].wkw_magnifications) == 2 @@ -209,30 +212,30 @@ def test_create_wk_dataset_with_explicit_header_fields() -> None: def test_create_tiff_dataset_with_layer_and_mag() -> None: # This test would be the same for WKDataset - delete_dir("./testoutput/tiff_dataset") + delete_dir(TESTOUTPUT_DIR / "tiff_dataset") - ds = WKDataset.create("./testoutput/tiff_dataset", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "tiff_dataset", scale=(1, 1, 1)) ds.add_layer("color", Layer.COLOR_TYPE) ds.get_layer("color").add_mag("1") ds.get_layer("color").add_mag("2-2-1") - assert path.exists("./testoutput/tiff_dataset/color/1") - assert path.exists("./testoutput/tiff_dataset/color/2-2-1") + assert (TESTOUTPUT_DIR / "tiff_dataset" / "color" / "1").exists() + assert (TESTOUTPUT_DIR / "tiff_dataset" / "color" / "2-2-1").exists() assert len(ds.properties.data_layers) == 1 assert len(ds.properties.data_layers["color"].wkw_magnifications) == 2 def test_open_wk_dataset() -> None: - ds = WKDataset("./testdata/simple_wk_dataset") + ds = WKDataset(TESTDATA_DIR / "simple_wk_dataset") assert len(ds.properties.data_layers) == 1 assert len(ds.properties.data_layers["color"].wkw_magnifications) == 1 def test_open_tiff_dataset() -> None: - ds = TiffDataset("./testdata/simple_tiff_dataset") + ds = TiffDataset(TESTDATA_DIR / "simple_tiff_dataset") assert len(ds.properties.data_layers) == 1 assert len(ds.properties.data_layers["color"].wkw_magnifications) == 1 @@ -240,7 +243,7 @@ def test_open_tiff_dataset() -> None: def test_view_read_with_open() -> None: - wk_view = WKDataset("./testdata/simple_wk_dataset/").get_view( + wk_view = WKDataset(TESTDATA_DIR / "simple_wk_dataset").get_view( "color", "1", size=(16, 16, 16) ) @@ -257,7 +260,7 @@ def test_view_read_with_open() -> None: def test_tiff_mag_read_with_open() -> None: - tiff_dataset = TiffDataset("./testdata/simple_tiff_dataset/") + tiff_dataset = TiffDataset(TESTDATA_DIR / "simple_tiff_dataset") layer = tiff_dataset.get_layer("color") mag = layer.get_mag("1") mag.open() @@ -268,7 +271,7 @@ def test_tiff_mag_read_with_open() -> None: def test_view_read_without_open() -> None: # This test would be the same for TiffDataset - wk_view = WKDataset("./testdata/simple_wk_dataset/").get_view( + wk_view = WKDataset(TESTDATA_DIR / "simple_wk_dataset").get_view( "color", "1", size=(16, 16, 16) ) @@ -282,10 +285,10 @@ def test_view_read_without_open() -> None: def test_view_wk_write() -> None: - delete_dir("./testoutput/simple_wk_dataset/") - copytree("./testdata/simple_wk_dataset/", "./testoutput/simple_wk_dataset/") + delete_dir(TESTOUTPUT_DIR / "simple_wk_dataset") + copytree(TESTDATA_DIR / "simple_wk_dataset", TESTOUTPUT_DIR / "simple_wk_dataset") - wk_view = WKDataset("./testoutput/simple_wk_dataset/").get_view( + wk_view = WKDataset(TESTOUTPUT_DIR / "simple_wk_dataset").get_view( "color", "1", size=(16, 16, 16) ) @@ -300,10 +303,12 @@ def test_view_wk_write() -> None: def test_view_tiff_write() -> None: - delete_dir("./testoutput/simple_tiff_dataset/") - copytree("./testdata/simple_tiff_dataset/", "./testoutput/simple_tiff_dataset/") + delete_dir(TESTOUTPUT_DIR / "simple_tiff_dataset") + copytree( + TESTDATA_DIR / "simple_tiff_dataset", TESTOUTPUT_DIR / "simple_tiff_dataset" + ) - tiff_view = TiffDataset("./testoutput/simple_tiff_dataset/").get_view( + tiff_view = TiffDataset(TESTOUTPUT_DIR / "simple_tiff_dataset").get_view( "color", "1", size=(16, 16, 10) ) @@ -319,10 +324,10 @@ def test_view_tiff_write() -> None: def test_view_tiff_write_out_of_bounds() -> None: - new_dataset_path = "./testoutput/tiff_view_dataset_out_of_bounds/" + new_dataset_path = TESTOUTPUT_DIR / "tiff_view_dataset_out_of_bounds" delete_dir(new_dataset_path) - copytree("./testdata/simple_tiff_dataset/", new_dataset_path) + copytree(TESTDATA_DIR / "simple_tiff_dataset", new_dataset_path) tiff_view = TiffDataset(new_dataset_path).get_view( "color", "1", size=(100, 100, 10) @@ -341,10 +346,10 @@ def test_view_tiff_write_out_of_bounds() -> None: def test_view_wk_write_out_of_bounds() -> None: - new_dataset_path = "./testoutput/wk_view_dataset_out_of_bounds/" + new_dataset_path = TESTOUTPUT_DIR / "wk_view_dataset_out_of_bounds" delete_dir(new_dataset_path) - copytree("./testdata/simple_wk_dataset/", new_dataset_path) + copytree(TESTDATA_DIR / "simple_wk_dataset", new_dataset_path) tiff_view = WKDataset(new_dataset_path).get_view("color", "1", size=(16, 16, 16)) @@ -363,7 +368,7 @@ def test_view_wk_write_out_of_bounds() -> None: def test_wk_view_out_of_bounds() -> None: try: # The size of the mag is (24, 24, 24). Trying to get an bigger view should throw an error - WKDataset("./testdata/simple_wk_dataset/").get_view( + WKDataset(TESTDATA_DIR / "simple_wk_dataset").get_view( "color", "1", size=(100, 100, 100) ) raise Exception( @@ -376,7 +381,7 @@ def test_wk_view_out_of_bounds() -> None: def test_tiff_view_out_of_bounds() -> None: try: # The size of the mag is (24, 24, 24). Trying to get an bigger view should throw an error - TiffDataset("./testdata/simple_tiff_dataset/").get_view( + TiffDataset(TESTDATA_DIR / "simple_tiff_dataset").get_view( "color", "1", size=(100, 100, 100) ) raise Exception( @@ -387,10 +392,10 @@ def test_tiff_view_out_of_bounds() -> None: def test_tiff_write_out_of_bounds() -> None: - new_dataset_path = "./testoutput/simple_tiff_dataset_out_of_bounds/" + new_dataset_path = TESTOUTPUT_DIR / "simple_tiff_dataset_out_of_bounds" delete_dir(new_dataset_path) - copytree("./testdata/simple_tiff_dataset/", new_dataset_path) + copytree(TESTDATA_DIR / "simple_tiff_dataset", new_dataset_path) ds = TiffDataset(new_dataset_path) mag_dataset = ds.get_layer("color").get_mag("1") @@ -403,10 +408,10 @@ def test_tiff_write_out_of_bounds() -> None: def test_wk_write_out_of_bounds() -> None: - new_dataset_path = "./testoutput/simple_wk_dataset_out_of_bounds/" + new_dataset_path = TESTOUTPUT_DIR / "simple_wk_dataset_out_of_bounds" delete_dir(new_dataset_path) - copytree("./testdata/simple_wk_dataset/", new_dataset_path) + copytree(TESTDATA_DIR / "simple_wk_dataset", new_dataset_path) ds = WKDataset(new_dataset_path) mag_dataset = ds.get_layer("color").get_mag("1") @@ -419,10 +424,10 @@ def test_wk_write_out_of_bounds() -> None: def test_wk_write_out_of_bounds_mag2() -> None: - new_dataset_path = "./testoutput/simple_wk_dataset_out_of_bounds/" + new_dataset_path = TESTOUTPUT_DIR / "simple_wk_dataset_out_of_bounds" delete_dir(new_dataset_path) - copytree("./testdata/simple_wk_dataset/", new_dataset_path) + copytree(TESTDATA_DIR / "simple_wk_dataset", new_dataset_path) ds = WKDataset(new_dataset_path) mag_dataset = ds.get_layer("color").get_or_add_mag("2-2-1") @@ -439,9 +444,9 @@ def test_wk_write_out_of_bounds_mag2() -> None: def test_update_new_bounding_box_offset() -> None: # This test would be the same for WKDataset - delete_dir("./testoutput/tiff_dataset") + delete_dir(TESTOUTPUT_DIR / "tiff_dataset") - ds = TiffDataset.create("./testoutput/tiff_dataset", scale=(1, 1, 1)) + ds = TiffDataset.create(TESTOUTPUT_DIR / "tiff_dataset", scale=(1, 1, 1)) mag = ds.add_layer("color", Layer.COLOR_TYPE).add_mag("1") assert ds.properties.data_layers["color"].bounding_box["topLeft"] == (-1, -1, -1) @@ -463,10 +468,10 @@ def test_other_file_extensions_for_tiff_dataset() -> None: # The TiffDataset also works with other file extensions (in this case .png) # It also works with .jpg but this format uses lossy compression - delete_dir("./testoutput/png_dataset") + delete_dir(TESTOUTPUT_DIR / "png_dataset") ds = TiffDataset.create( - "./testoutput/png_dataset", scale=(1, 1, 1), pattern="{zzz}.png" + TESTOUTPUT_DIR / "png_dataset", scale=(1, 1, 1), pattern="{zzz}.png" ) mag = ds.add_layer("color", Layer.COLOR_TYPE).add_mag("1") @@ -477,7 +482,7 @@ def test_other_file_extensions_for_tiff_dataset() -> None: def test_tiff_write_multi_channel_uint8() -> None: - dataset_path = "./testoutput/tiff_multichannel/" + dataset_path = TESTOUTPUT_DIR / "tiff_multichannel" delete_dir(dataset_path) ds_tiff = TiffDataset.create(dataset_path, scale=(1, 1, 1)) @@ -492,7 +497,7 @@ def test_tiff_write_multi_channel_uint8() -> None: def test_wk_write_multi_channel_uint8() -> None: - dataset_path = "./testoutput/wk_multichannel/" + dataset_path = TESTOUTPUT_DIR / "wk_multichannel" delete_dir(dataset_path) ds_tiff = WKDataset.create(dataset_path, scale=(1, 1, 1)) @@ -507,7 +512,7 @@ def test_wk_write_multi_channel_uint8() -> None: def test_tiff_write_multi_channel_uint16() -> None: - dataset_path = "./testoutput/tiff_multichannel/" + dataset_path = TESTOUTPUT_DIR / "tiff_multichannel" delete_dir(dataset_path) ds_tiff = TiffDataset.create(dataset_path, scale=(1, 1, 1)) @@ -527,7 +532,7 @@ def test_tiff_write_multi_channel_uint16() -> None: def test_wk_write_multi_channel_uint16() -> None: - dataset_path = "./testoutput/wk_multichannel/" + dataset_path = TESTOUTPUT_DIR / "wk_multichannel" delete_dir(dataset_path) ds_tiff = WKDataset.create(dataset_path, scale=(1, 1, 1)) @@ -545,7 +550,7 @@ def test_wk_write_multi_channel_uint16() -> None: def test_wkw_empty_read() -> None: - filename = "./testoutput/empty_wk_dataset" + filename = TESTOUTPUT_DIR / "empty_wk_dataset" delete_dir(filename) mag = ( @@ -559,7 +564,7 @@ def test_wkw_empty_read() -> None: def test_tiff_empty_read() -> None: - filename = "./testoutput/empty_tiff_dataset" + filename = TESTOUTPUT_DIR / "empty_tiff_dataset" delete_dir(filename) mag = ( @@ -573,7 +578,7 @@ def test_tiff_empty_read() -> None: def test_tiff_read_padded_data() -> None: - filename = "./testoutput/empty_tiff_dataset" + filename = TESTOUTPUT_DIR / "empty_tiff_dataset" delete_dir(filename) mag = ( @@ -589,7 +594,7 @@ def test_tiff_read_padded_data() -> None: def test_wk_read_padded_data() -> None: - filename = "./testoutput/empty_wk_dataset" + filename = TESTOUTPUT_DIR / "empty_wk_dataset" delete_dir(filename) mag = ( @@ -605,13 +610,15 @@ def test_wk_read_padded_data() -> None: def test_read_and_write_of_properties() -> None: - destination_path = "./testoutput/read_write_properties/" + destination_path = TESTOUTPUT_DIR / "read_write_properties" delete_dir(destination_path) - source_file_name = "./testdata/simple_tiff_dataset/datasource-properties.json" - destination_file_name = destination_path + "datasource-properties.json" + source_file_name = ( + TESTDATA_DIR / "simple_tiff_dataset" / "datasource-properties.json" + ) + destination_file_name = destination_path / "datasource-properties.json" imported_properties = TiffProperties._from_json(source_file_name) - imported_properties._path = destination_file_name + imported_properties._path = str(destination_file_name) makedirs(destination_path) imported_properties._export_as_json() @@ -619,9 +626,9 @@ def test_read_and_write_of_properties() -> None: def test_num_channel_mismatch_assertion() -> None: - delete_dir("./testoutput/wk_dataset") + delete_dir(TESTOUTPUT_DIR / "wk_dataset") - ds = WKDataset.create("./testoutput/wk_dataset", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "wk_dataset", scale=(1, 1, 1)) mag = ds.add_layer("color", Layer.COLOR_TYPE, num_channels=1).add_mag( "1" ) # num_channel=1 is also the default @@ -641,9 +648,9 @@ def test_num_channel_mismatch_assertion() -> None: def test_get_or_add_layer() -> None: # This test would be the same for TiffDataset - delete_dir("./testoutput/wk_dataset") + delete_dir(TESTOUTPUT_DIR / "wk_dataset") - ds = WKDataset.create("./testoutput/wk_dataset", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "wk_dataset", scale=(1, 1, 1)) assert "color" not in ds.layers.keys() @@ -675,9 +682,9 @@ def test_get_or_add_layer() -> None: def test_get_or_add_mag_for_wk() -> None: - delete_dir("./testoutput/wk_dataset") + delete_dir(TESTOUTPUT_DIR / "wk_dataset") - layer = WKDataset.create("./testoutput/wk_dataset", scale=(1, 1, 1)).add_layer( + layer = WKDataset.create(TESTOUTPUT_DIR / "wk_dataset", scale=(1, 1, 1)).add_layer( "color", Layer.COLOR_TYPE ) @@ -705,11 +712,11 @@ def test_get_or_add_mag_for_wk() -> None: def test_get_or_add_mag_for_tiff() -> None: - delete_dir("./testoutput/wk_dataset") + delete_dir(TESTOUTPUT_DIR / "wk_dataset") - layer = TiffDataset.create("./testoutput/wk_dataset", scale=(1, 1, 1)).add_layer( - "color", Layer.COLOR_TYPE - ) + layer = TiffDataset.create( + TESTOUTPUT_DIR / "wk_dataset", scale=(1, 1, 1) + ).add_layer("color", Layer.COLOR_TYPE) assert "1" not in layer.mags.keys() @@ -725,9 +732,9 @@ def test_get_or_add_mag_for_tiff() -> None: def test_tiled_tiff_read_and_write_multichannel() -> None: - delete_dir("./testoutput/TiledTiffDataset") + delete_dir(TESTOUTPUT_DIR / "TiledTiffDataset") tiled_tiff_ds = TiledTiffDataset.create( - "./testoutput/TiledTiffDataset", + TESTOUTPUT_DIR / "TiledTiffDataset", scale=(1, 1, 1), tile_size=(32, 64), pattern="{xxx}_{yyy}_{zzz}.tif", @@ -746,9 +753,9 @@ def test_tiled_tiff_read_and_write_multichannel() -> None: def test_tiled_tiff_read_and_write() -> None: - delete_dir("./testoutput/tiled_tiff_dataset") + delete_dir(TESTOUTPUT_DIR / "tiled_tiff_dataset") tiled_tiff_ds = TiledTiffDataset.create( - "./testoutput/tiled_tiff_dataset", + TESTOUTPUT_DIR / "tiled_tiff_dataset", scale=(1, 1, 1), tile_size=(32, 64), pattern="{xxx}_{yyy}_{zzz}.tif", @@ -770,40 +777,44 @@ def test_tiled_tiff_read_and_write() -> None: assert mag.get_tile(1, 1, 6).shape == (1, 32, 64, 1) assert np.array_equal( mag.get_tile(1, 2, 6)[0, :, :, 0], - TiffReader("./testoutput/tiled_tiff_dataset/color/1/001_002_006.tif").read(), + TiffReader( + TESTOUTPUT_DIR / "tiled_tiff_dataset" / "color" / "1" / "001_002_006.tif" + ).read(), ) assert np.array_equal( data[(32 * 1) - 5 : (32 * 2) - 5, (64 * 2) - 5 : (64 * 3) - 5, 6], - TiffReader("./testoutput/tiled_tiff_dataset/color/1/001_002_006.tif").read(), + TiffReader( + TESTOUTPUT_DIR / "tiled_tiff_dataset" / "color" / "1" / "001_002_006.tif" + ).read(), ) def test_open_dataset_without_num_channels_in_properties() -> None: - delete_dir("./testoutput/old_wk_dataset/") - copytree("./testdata/old_wk_dataset/", "./testoutput/old_wk_dataset/") + delete_dir(TESTOUTPUT_DIR / "old_wk_dataset") + copytree(TESTDATA_DIR / "old_wk_dataset", TESTOUTPUT_DIR / "old_wk_dataset") with open( - "./testoutput/old_wk_dataset/datasource-properties.json" + TESTOUTPUT_DIR / "old_wk_dataset" / "datasource-properties.json" ) as datasource_properties: data = json.load(datasource_properties) assert data["dataLayers"][0].get("num_channels") is None - ds = WKDataset("./testoutput/old_wk_dataset/") + ds = WKDataset(TESTOUTPUT_DIR / "old_wk_dataset") assert ds.properties.data_layers["color"].num_channels == 1 ds.properties._export_as_json() with open( - "./testoutput/old_wk_dataset/datasource-properties.json" + TESTOUTPUT_DIR / "old_wk_dataset" / "datasource-properties.json" ) as datasource_properties: data = json.load(datasource_properties) assert data["dataLayers"][0].get("num_channels") == 1 def test_advanced_pattern() -> None: - delete_dir("./testoutput/tiff_dataset_advanced_pattern") + delete_dir(TESTOUTPUT_DIR / "tiff_dataset_advanced_pattern") ds = TiledTiffDataset.create( - "./testoutput/tiff_dataset_advanced_pattern", + TESTOUTPUT_DIR / "tiff_dataset_advanced_pattern", scale=(1, 1, 1), tile_size=(32, 32), pattern="{xxxx}/{yyyy}/{zzzz}.tif", @@ -817,10 +828,10 @@ def test_advanced_pattern() -> None: def test_invalid_pattern() -> None: - delete_dir("./testoutput/tiff_invalid_dataset") + delete_dir(TESTOUTPUT_DIR / "tiff_invalid_dataset") try: TiledTiffDataset.create( - "./testoutput/tiff_invalid_dataset", + TESTOUTPUT_DIR / "tiff_invalid_dataset", scale=(1, 1, 1), tile_size=(32, 32), pattern="{xxxx}/{yyyy}/{zzzz.tif", @@ -833,7 +844,7 @@ def test_invalid_pattern() -> None: try: TiledTiffDataset.create( - "./testoutput/tiff_invalid_dataset", + TESTOUTPUT_DIR / "tiff_invalid_dataset", scale=(1, 1, 1), tile_size=(32, 32), pattern="zzzz.tif", @@ -846,7 +857,7 @@ def test_invalid_pattern() -> None: def test_largest_segment_id_requirement() -> None: - path = "./testoutput/largest_segment_id" + path = TESTOUTPUT_DIR / "largest_segment_id" delete_dir(path) ds = WKDataset.create(path, scale=(10, 10, 10)) @@ -866,8 +877,12 @@ def test_largest_segment_id_requirement() -> None: def test_properties_with_segmentation() -> None: - input_json_path = "./testdata/complex_property_ds/datasource-properties.json" - output_json_path = "./testoutput/complex_property_ds/datasource-properties.json" + input_json_path = ( + TESTDATA_DIR / "complex_property_ds" / "datasource-properties.json" + ) + output_json_path = ( + TESTOUTPUT_DIR / "complex_property_ds" / "datasource-properties.json" + ) properties = WKProperties._from_json(input_json_path) # the attributes 'largest_segment_id' and 'mappings' only exist if it is a SegmentationLayer @@ -883,7 +898,7 @@ def test_properties_with_segmentation() -> None: # export the json under a new name makedirs(dirname(output_json_path), exist_ok=True) - properties._path = output_json_path + properties._path = str(output_json_path) properties._export_as_json() # validate if contents match @@ -900,10 +915,10 @@ def test_properties_with_segmentation() -> None: def test_chunking_wk() -> None: - delete_dir("./testoutput/chunking_dataset_wk/") - copytree("./testdata/simple_wk_dataset/", "./testoutput/chunking_dataset_wk/") + delete_dir(TESTOUTPUT_DIR / "chunking_dataset_wk") + copytree(TESTDATA_DIR / "simple_wk_dataset", TESTOUTPUT_DIR / "chunking_dataset_wk") - view = WKDataset("./testoutput/chunking_dataset_wk/").get_view( + view = WKDataset(TESTOUTPUT_DIR / "chunking_dataset_wk").get_view( "color", "1", size=(256, 256, 256), is_bounded=False ) @@ -920,12 +935,13 @@ def test_chunking_wk() -> None: def test_chunking_wk_advanced() -> None: - delete_dir("./testoutput/chunking_dataset_wk_advanced/") + delete_dir(TESTOUTPUT_DIR / "chunking_dataset_wk_advanced") copytree( - "./testdata/simple_wk_dataset/", "./testoutput/chunking_dataset_wk_advanced/" + TESTDATA_DIR / "simple_wk_dataset", + TESTOUTPUT_DIR / "chunking_dataset_wk_advanced", ) - ds = WKDataset("./testoutput/chunking_dataset_wk_advanced/") + ds = WKDataset(TESTOUTPUT_DIR / "chunking_dataset_wk_advanced") view = ds.get_view( "color", "1", size=(150, 150, 54), offset=(10, 10, 10), is_bounded=False ) @@ -933,24 +949,26 @@ def test_chunking_wk_advanced() -> None: def test_chunking_wk_wrong_chunk_size() -> None: - delete_dir("./testoutput/chunking_dataset_wk_with_wrong_chunk_size/") + delete_dir(TESTOUTPUT_DIR / "chunking_dataset_wk_with_wrong_chunk_size") copytree( - "./testdata/simple_wk_dataset/", - "./testoutput/chunking_dataset_wk_with_wrong_chunk_size/", + TESTDATA_DIR / "simple_wk_dataset", + TESTOUTPUT_DIR / "chunking_dataset_wk_with_wrong_chunk_size", ) view = WKDataset( - "./testoutput/chunking_dataset_wk_with_wrong_chunk_size/" + TESTOUTPUT_DIR / "chunking_dataset_wk_with_wrong_chunk_size" ).get_view("color", "1", size=(256, 256, 256), is_bounded=False) for_each_chunking_with_wrong_chunk_size(view) def test_chunking_tiff() -> None: - delete_dir("./testoutput/chunking_dataset_tiff/") - copytree("./testdata/simple_tiff_dataset/", "./testoutput/chunking_dataset_tiff/") + delete_dir(TESTOUTPUT_DIR / "chunking_dataset_tiff") + copytree( + TESTDATA_DIR / "simple_tiff_dataset", TESTOUTPUT_DIR / "chunking_dataset_tiff" + ) - view = TiffDataset("./testoutput/chunking_dataset_tiff/").get_view( + view = TiffDataset(TESTOUTPUT_DIR / "chunking_dataset_tiff").get_view( "color", "1", size=(265, 265, 10) ) @@ -968,24 +986,24 @@ def test_chunking_tiff() -> None: def test_chunking_tiff_wrong_chunk_size() -> None: - delete_dir("./testoutput/chunking_dataset_tiff_with_wrong_chunk_size/") + delete_dir(TESTOUTPUT_DIR / "chunking_dataset_tiff_with_wrong_chunk_size") copytree( - "./testdata/simple_tiff_dataset/", - "./testoutput/chunking_dataset_tiff_with_wrong_chunk_size/", + TESTDATA_DIR / "simple_tiff_dataset", + TESTOUTPUT_DIR / "chunking_dataset_tiff_with_wrong_chunk_size", ) view = TiffDataset( - "./testoutput/chunking_dataset_tiff_with_wrong_chunk_size/" + TESTOUTPUT_DIR / "chunking_dataset_tiff_with_wrong_chunk_size" ).get_view("color", "1", size=(256, 256, 256), is_bounded=False) for_each_chunking_with_wrong_chunk_size(view) def test_chunking_tiled_tiff_wrong_chunk_size() -> None: - delete_dir("./testoutput/chunking_dataset_tiled_tiff_with_wrong_chunk_size/") + delete_dir(TESTOUTPUT_DIR / "chunking_dataset_tiled_tiff_with_wrong_chunk_size") ds = TiledTiffDataset.create( - "./testoutput/chunking_dataset_tiled_tiff_with_wrong_chunk_size/", + TESTOUTPUT_DIR / "chunking_dataset_tiled_tiff_with_wrong_chunk_size", scale=(1, 1, 1), tile_size=(32, 32), pattern="{xxxx}/{yyyy}/{zzzz}.tif", @@ -997,13 +1015,13 @@ def test_chunking_tiled_tiff_wrong_chunk_size() -> None: def test_chunking_tiled_tiff_advanced() -> None: - delete_dir("./testoutput/chunking_dataset_tiled_tiff_advanced/") + delete_dir(TESTOUTPUT_DIR / "chunking_dataset_tiled_tiff_advanced") copytree( - "./testdata/simple_wk_dataset/", - "./testoutput/chunking_dataset_tiled_tiff_advanced/", + TESTDATA_DIR / "simple_wk_dataset", + TESTOUTPUT_DIR / "chunking_dataset_tiled_tiff_advanced", ) - ds = WKDataset("./testoutput/chunking_dataset_tiled_tiff_advanced/") + ds = WKDataset(TESTOUTPUT_DIR / "chunking_dataset_tiled_tiff_advanced") view = ds.get_view( "color", "1", size=(150, 150, 54), offset=(10, 10, 10), is_bounded=False ) @@ -1012,9 +1030,9 @@ def test_chunking_tiled_tiff_advanced() -> None: def test_tiled_tiff_inverse_pattern() -> None: - delete_dir("./testoutput/tiled_tiff_dataset_inverse") + delete_dir(TESTOUTPUT_DIR / "tiled_tiff_dataset_inverse") tiled_tiff_ds = TiledTiffDataset.create( - "./testoutput/tiled_tiff_dataset_inverse", + TESTOUTPUT_DIR / "tiled_tiff_dataset_inverse", scale=(1, 1, 1), tile_size=(32, 64), pattern="{zzz}/{xxx}/{yyy}.tif", @@ -1040,24 +1058,36 @@ def test_tiled_tiff_inverse_pattern() -> None: assert np.array_equal( mag.get_tile(1, 2, 6)[0, :, :, 0], TiffReader( - "./testoutput/tiled_tiff_dataset_inverse/color/1/006/001/002.tif" + TESTOUTPUT_DIR + / "tiled_tiff_dataset_inverse" + / "color" + / "1" + / "006" + / "001" + / "002.tif" ).read(), ) assert np.array_equal( data[(32 * 1) - 5 : (32 * 2) - 5, (64 * 2) - 5 : (64 * 3) - 5, 6], TiffReader( - "./testoutput/tiled_tiff_dataset_inverse/color/1/006/001/002.tif" + TESTOUTPUT_DIR + / "tiled_tiff_dataset_inverse" + / "color" + / "1" + / "006" + / "001" + / "002.tif" ).read(), ) def test_view_write_without_open() -> None: # This test would be the same for TiffDataset + ds_path = TESTOUTPUT_DIR / "wk_dataset_write_without_open" + delete_dir(ds_path) - delete_dir("./testoutput/wk_dataset_write_without_open") - - ds = WKDataset.create("./testoutput/wk_dataset_write_without_open", scale=(1, 1, 1)) + ds = WKDataset.create(ds_path, scale=(1, 1, 1)) ds.add_layer("color", Layer.COLOR_TYPE) ds.get_layer("color").add_mag("1") @@ -1073,7 +1103,7 @@ def test_view_write_without_open() -> None: def test_typing_of_get_mag() -> None: - ds = WKDataset("./testdata/simple_wk_dataset") + ds = WKDataset(TESTDATA_DIR / "simple_wk_dataset") layer = ds.get_layer("color") assert layer.get_mag("1") == layer.get_mag(1) assert layer.get_mag("1") == layer.get_mag((1, 1, 1)) @@ -1083,54 +1113,44 @@ def test_typing_of_get_mag() -> None: def test_wk_dataset_get_or_create() -> None: - delete_dir("./testoutput/wk_dataset_get_or_create") + ds_path = TESTOUTPUT_DIR / "wk_dataset_get_or_create" + delete_dir(ds_path) # dataset does not exists yet - ds1 = WKDataset.get_or_create( - "./testoutput/wk_dataset_get_or_create", scale=(1, 1, 1) - ) + ds1 = WKDataset.get_or_create(ds_path, scale=(1, 1, 1)) assert "color" not in ds1.layers.keys() ds1.add_layer("color", Layer.COLOR_TYPE) assert "color" in ds1.layers.keys() # dataset already exists - ds2 = WKDataset.get_or_create( - "./testoutput/wk_dataset_get_or_create", scale=(1, 1, 1) - ) + ds2 = WKDataset.get_or_create(ds_path, scale=(1, 1, 1)) assert "color" in ds2.layers.keys() try: # dataset already exists, but with a different scale - WKDataset.get_or_create( - "./testoutput/wk_dataset_get_or_create", scale=(2, 2, 2) - ) + WKDataset.get_or_create(ds_path, scale=(2, 2, 2)) raise Exception(expected_error_msg) except AssertionError: pass def test_tiff_dataset_get_or_create() -> None: - delete_dir("./testoutput/tiff_dataset_get_or_create") + ds_path = TESTOUTPUT_DIR / "tiff_dataset_get_or_create" + delete_dir(ds_path) # dataset does not exists yet - ds1 = TiffDataset.get_or_create( - "./testoutput/tiff_dataset_get_or_create", scale=(1, 1, 1) - ) + ds1 = TiffDataset.get_or_create(ds_path, scale=(1, 1, 1)) assert "color" not in ds1.layers.keys() ds1.add_layer("color", Layer.COLOR_TYPE) assert "color" in ds1.layers.keys() # dataset already exists - ds2 = TiffDataset.get_or_create( - "./testoutput/tiff_dataset_get_or_create", scale=(1, 1, 1) - ) + ds2 = TiffDataset.get_or_create(ds_path, scale=(1, 1, 1)) assert "color" in ds2.layers.keys() try: # dataset already exists, but with a different scale - TiffDataset.get_or_create( - "./testoutput/tiff_dataset_get_or_create", scale=(2, 2, 2) - ) + TiffDataset.get_or_create(ds_path, scale=(2, 2, 2)) raise Exception(expected_error_msg) except AssertionError: pass @@ -1138,7 +1158,7 @@ def test_tiff_dataset_get_or_create() -> None: try: # dataset already exists, but with a different pattern TiffDataset.get_or_create( - "./testoutput/tiff_dataset_get_or_create", + ds_path, scale=(1, 1, 1), pattern="ds_{zzz}.tif", ) @@ -1148,11 +1168,12 @@ def test_tiff_dataset_get_or_create() -> None: def test_tiled_tiff_dataset_get_or_create() -> None: - delete_dir("./testoutput/tiled_tiff_dataset_get_or_create") + delete_dir(TESTOUTPUT_DIR / "tiled_tiff_dataset_get_or_create") + ds_path = TESTOUTPUT_DIR / "tiled_tiff_dataset_get_or_create" # dataset does not exists yet ds1 = TiledTiffDataset.get_or_create( - "./testoutput/tiled_tiff_dataset_get_or_create", + ds_path, scale=(1, 1, 1), tile_size=(32, 64), ) @@ -1162,7 +1183,7 @@ def test_tiled_tiff_dataset_get_or_create() -> None: # dataset already exists ds2 = TiledTiffDataset.get_or_create( - "./testoutput/tiled_tiff_dataset_get_or_create", + ds_path, scale=(1, 1, 1), tile_size=(32, 64), ) @@ -1171,7 +1192,7 @@ def test_tiled_tiff_dataset_get_or_create() -> None: try: # dataset already exists, but with a different scale TiledTiffDataset.get_or_create( - "./testoutput/tiled_tiff_dataset_get_or_create", + ds_path, scale=(2, 2, 2), tile_size=(32, 64), ) @@ -1182,7 +1203,7 @@ def test_tiled_tiff_dataset_get_or_create() -> None: try: # dataset already exists, but with a different tile_size TiledTiffDataset.get_or_create( - "./testoutput/tiled_tiff_dataset_get_or_create", + TESTOUTPUT_DIR / "tiled_tiff_dataset_get_or_create", scale=(1, 1, 1), tile_size=(100, 100), ) @@ -1193,7 +1214,7 @@ def test_tiled_tiff_dataset_get_or_create() -> None: try: # dataset already exists, but with a different pattern TiledTiffDataset.get_or_create( - "./testoutput/tiled_tiff_dataset_get_or_create", + TESTOUTPUT_DIR / "tiled_tiff_dataset_get_or_create", scale=(1, 1, 1), tile_size=(32, 64), pattern="ds_{zzz}.tif", @@ -1204,13 +1225,13 @@ def test_tiled_tiff_dataset_get_or_create() -> None: def test_changing_layer_bounding_box() -> None: - delete_dir("./testoutput/test_changing_layer_bounding_box/") + delete_dir(TESTOUTPUT_DIR / "test_changing_layer_bounding_box") copytree( - "./testdata/simple_tiff_dataset/", - "./testoutput/test_changing_layer_bounding_box/", + TESTDATA_DIR / "simple_tiff_dataset", + TESTOUTPUT_DIR / "test_changing_layer_bounding_box", ) - ds = TiffDataset("./testoutput/test_changing_layer_bounding_box/") + ds = TiffDataset(TESTOUTPUT_DIR / "test_changing_layer_bounding_box") layer = ds.get_layer("color") mag = layer.get_mag("1") @@ -1253,9 +1274,9 @@ def test_changing_layer_bounding_box() -> None: def test_view_offsets() -> None: - delete_dir("./testoutput/wk_offset_tests") + delete_dir(TESTOUTPUT_DIR / "wk_offset_tests") - ds = WKDataset.create("./testoutput/wk_offset_tests", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "wk_offset_tests", scale=(1, 1, 1)) mag = ds.add_layer("color", Layer.COLOR_TYPE).add_mag("1") # The dataset is new -> no data has been written. @@ -1325,9 +1346,9 @@ def test_view_offsets() -> None: def test_adding_layer_with_invalid_dtype_per_layer() -> None: - delete_dir("./testoutput/invalid_dtype") + delete_dir(TESTOUTPUT_DIR / "invalid_dtype") - ds = WKDataset.create("./testoutput/invalid_dtype", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "invalid_dtype", scale=(1, 1, 1)) with pytest.raises(TypeError): # this would lead to a dtype_per_channel of "uint10", but that is not a valid dtype ds.add_layer( @@ -1342,9 +1363,9 @@ def test_adding_layer_with_invalid_dtype_per_layer() -> None: def test_adding_layer_with_valid_dtype_per_layer() -> None: - delete_dir("./testoutput/valid_dtype") + delete_dir(TESTOUTPUT_DIR / "valid_dtype") - ds = WKDataset.create("./testoutput/valid_dtype", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "valid_dtype", scale=(1, 1, 1)) ds.add_layer("color1", Layer.COLOR_TYPE, dtype_per_layer="uint24", num_channels=3) ds.add_layer("color2", Layer.COLOR_TYPE, dtype_per_layer=np.uint8, num_channels=1) ds.add_layer("color3", Layer.COLOR_TYPE, dtype_per_channel=np.uint8, num_channels=3) @@ -1352,28 +1373,24 @@ def test_adding_layer_with_valid_dtype_per_layer() -> None: def test_writing_subset_of_compressed_data_multi_channel() -> None: - delete_dir("./testoutput/compressed_data/") + delete_dir(TESTOUTPUT_DIR / "compressed_data") # create uncompressed dataset write_data1 = (np.random.rand(3, 20, 40, 60) * 255).astype(np.uint8) - WKDataset.create( - os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) - ).add_layer("color", Layer.COLOR_TYPE, num_channels=3).add_mag( - "1", block_len=8, file_len=8 - ).write( - write_data1 - ) + WKDataset.create(TESTOUTPUT_DIR / "compressed_data", scale=(1, 1, 1)).add_layer( + "color", Layer.COLOR_TYPE, num_channels=3 + ).add_mag("1", block_len=8, file_len=8).write(write_data1) # compress data compress_mag_inplace( - os.path.abspath("./testoutput/compressed_data/"), + (TESTOUTPUT_DIR / "compressed_data").resolve(), layer_name="color", mag=Mag("1"), ) # open compressed dataset compressed_mag = ( - WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1") + WKDataset(TESTOUTPUT_DIR / "compressed_data").get_layer("color").get_mag("1") ) write_data2 = (np.random.rand(3, 10, 10, 10) * 255).astype(np.uint8) @@ -1391,26 +1408,24 @@ def test_writing_subset_of_compressed_data_multi_channel() -> None: def test_writing_subset_of_compressed_data_single_channel() -> None: - delete_dir("./testoutput/compressed_data/") + delete_dir(TESTOUTPUT_DIR / "compressed_data") # create uncompressed dataset write_data1 = (np.random.rand(20, 40, 60) * 255).astype(np.uint8) - WKDataset.create( - os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) - ).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write( - write_data1 - ) + WKDataset.create(TESTOUTPUT_DIR / "compressed_data", scale=(1, 1, 1)).add_layer( + "color", Layer.COLOR_TYPE + ).add_mag("1", block_len=8, file_len=8).write(write_data1) # compress data compress_mag_inplace( - os.path.abspath("./testoutput/compressed_data/"), + TESTOUTPUT_DIR / "compressed_data", layer_name="color", mag=Mag("1"), ) # open compressed dataset compressed_mag = ( - WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1") + WKDataset(TESTOUTPUT_DIR / "compressed_data").get_layer("color").get_mag("1") ) write_data2 = (np.random.rand(10, 10, 10) * 255).astype(np.uint8) @@ -1428,25 +1443,25 @@ def test_writing_subset_of_compressed_data_single_channel() -> None: def test_writing_subset_of_compressed_data() -> None: - delete_dir("./testoutput/compressed_data/") + delete_dir(TESTOUTPUT_DIR / "compressed_data") # create uncompressed dataset - WKDataset.create( - os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) - ).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write( + WKDataset.create(TESTOUTPUT_DIR / "compressed_data", scale=(1, 1, 1)).add_layer( + "color", Layer.COLOR_TYPE + ).add_mag("1", block_len=8, file_len=8).write( (np.random.rand(20, 40, 60) * 255).astype(np.uint8) ) # compress data compress_mag_inplace( - os.path.abspath("./testoutput/compressed_data/"), + (TESTOUTPUT_DIR / "compressed_data").resolve(), layer_name="color", mag=Mag("1"), ) # open compressed dataset compressed_mag = ( - WKDataset("./testoutput/compressed_data").get_layer("color").get_mag("1") + WKDataset(TESTOUTPUT_DIR / "compressed_data").get_layer("color").get_mag("1") ) with pytest.raises(WKWException): @@ -1458,25 +1473,23 @@ def test_writing_subset_of_compressed_data() -> None: def test_writing_subset_of_chunked_compressed_data() -> None: - delete_dir("./testoutput/compressed_data/") + delete_dir(TESTOUTPUT_DIR / "compressed_data") # create uncompressed dataset write_data1 = (np.random.rand(100, 200, 300) * 255).astype(np.uint8) - WKDataset.create( - os.path.abspath("./testoutput/compressed_data"), scale=(1, 1, 1) - ).add_layer("color", Layer.COLOR_TYPE).add_mag("1", block_len=8, file_len=8).write( - write_data1 - ) + WKDataset.create(TESTOUTPUT_DIR / "compressed_data", scale=(1, 1, 1)).add_layer( + "color", Layer.COLOR_TYPE + ).add_mag("1", block_len=8, file_len=8).write(write_data1) # compress data compress_mag_inplace( - os.path.abspath("./testoutput/compressed_data/"), + TESTOUTPUT_DIR / "compressed_data", layer_name="color", mag=Mag("1"), ) # open compressed dataset - compressed_view = WKDataset("./testoutput/compressed_data").get_view( + compressed_view = WKDataset(TESTOUTPUT_DIR / "compressed_data").get_view( "color", "1", size=(100, 200, 300), is_bounded=True ) @@ -1504,21 +1517,25 @@ def test_writing_subset_of_chunked_compressed_data() -> None: def test_add_symlink_layer() -> None: - delete_dir("./testoutput/wk_dataset_with_symlink") - delete_dir("./testoutput/simple_wk_dataset_copy") - copytree("./testdata/simple_wk_dataset/", "./testoutput/simple_wk_dataset_copy/") + delete_dir(TESTOUTPUT_DIR / "wk_dataset_with_symlink") + delete_dir(TESTOUTPUT_DIR / "simple_wk_dataset_copy") + copytree( + TESTDATA_DIR / "simple_wk_dataset", TESTOUTPUT_DIR / "simple_wk_dataset_copy" + ) original_mag = ( - WKDataset("./testoutput/simple_wk_dataset_copy/") + WKDataset(TESTOUTPUT_DIR / "simple_wk_dataset_copy") .get_layer("color") .get_mag("1") ) - ds = WKDataset.create("./testoutput/wk_dataset_with_symlink", scale=(1, 1, 1)) - symlink_layer = ds.add_symlink_layer("./testoutput/simple_wk_dataset_copy/color/") + ds = WKDataset.create(TESTOUTPUT_DIR / "wk_dataset_with_symlink", scale=(1, 1, 1)) + symlink_layer = ds.add_symlink_layer( + TESTOUTPUT_DIR / "simple_wk_dataset_copy" / "color" + ) mag = symlink_layer.get_mag("1") - assert path.exists("./testoutput/wk_dataset_with_symlink/color/1") + assert (TESTOUTPUT_DIR / "wk_dataset_with_symlink" / "color" / "1").exists() assert len(ds.properties.data_layers) == 1 assert len(ds.properties.data_layers["color"].wkw_magnifications) == 1 @@ -1532,9 +1549,9 @@ def test_add_symlink_layer() -> None: def test_search_dataset_also_for_long_layer_name() -> None: - delete_dir("./testoutput/long_layer_name") + delete_dir(TESTOUTPUT_DIR / "long_layer_name") - ds = WKDataset.create("./testoutput/long_layer_name", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "long_layer_name", scale=(1, 1, 1)) mag = ds.add_layer("color", Layer.COLOR_TYPE).add_mag("2") assert mag.name == "2" @@ -1559,7 +1576,7 @@ def test_search_dataset_also_for_long_layer_name() -> None: mag.read(offset=(10, 10, 10), size=(10, 10, 10)) # when opening the dataset, it searches both for the long and the short path - layer = WKDataset("./testoutput/long_layer_name").get_layer("color") + layer = WKDataset(TESTOUTPUT_DIR / "long_layer_name").get_layer("color") mag = layer.get_mag("2") assert np.array_equal( mag.read(offset=(10, 10, 10), size=(10, 10, 10)), np.expand_dims(write_data, 0) @@ -1568,9 +1585,9 @@ def test_search_dataset_also_for_long_layer_name() -> None: def test_outdated_dtype_parameter() -> None: - delete_dir("./testoutput/outdated_dtype") + delete_dir(TESTOUTPUT_DIR / "outdated_dtype") - ds = WKDataset.create("./testoutput/outdated_dtype", scale=(1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "outdated_dtype", scale=(1, 1, 1)) with pytest.raises(ValueError): ds.get_or_add_layer("color", Layer.COLOR_TYPE, dtype=np.uint8, num_channels=1) @@ -1579,14 +1596,14 @@ def test_outdated_dtype_parameter() -> None: def test_dataset_conversion() -> None: - origin_wk_ds_path = "./testoutput/conversion/origin_wk/" - origin_tiff_ds_path = "./testoutput/conversion/origin_tiff/" + origin_wk_ds_path = TESTOUTPUT_DIR / "conversion" / "origin_wk" + origin_tiff_ds_path = TESTOUTPUT_DIR / "conversion" / "origin_tiff" - wk_to_tiff_ds_path = "./testoutput/conversion/wk_to_tiff/" - tiff_to_wk_ds_path = "./testoutput/conversion/tiff_to_wk/" + wk_to_tiff_ds_path = TESTOUTPUT_DIR / "conversion" / "wk_to_tiff" + tiff_to_wk_ds_path = TESTOUTPUT_DIR / "conversion" / "tiff_to_wk" - wk_to_tiff_to_wk_ds_path = "./testoutput/conversion/wk_to_tiff_to_wk/" - tiff_to_wk_to_tiff_ds_path = "./testoutput/conversion/tiff_to_wk_to_tiff/" + wk_to_tiff_to_wk_ds_path = TESTOUTPUT_DIR / "conversion" / "wk_to_tiff_to_wk" + tiff_to_wk_to_tiff_ds_path = TESTOUTPUT_DIR / "conversion" / "tiff_to_wk_to_tiff" delete_dir(origin_wk_ds_path) delete_dir(origin_tiff_ds_path) @@ -1684,16 +1701,18 @@ def test_dataset_conversion() -> None: def test_for_zipped_chunks() -> None: - delete_dir("./testoutput/zipped_chunking_source/") - delete_dir("./testoutput/zipped_chunking_target/") - copytree("./testdata/simple_wk_dataset/", "./testoutput/zipped_chunking_source/") + delete_dir(TESTOUTPUT_DIR / "zipped_chunking_source") + delete_dir(TESTOUTPUT_DIR / "zipped_chunking_target") + copytree( + TESTDATA_DIR / "simple_wk_dataset", TESTOUTPUT_DIR / "zipped_chunking_source" + ) - source_view = WKDataset("./testoutput/zipped_chunking_source/").get_view( + source_view = WKDataset(TESTOUTPUT_DIR / "zipped_chunking_source").get_view( "color", "1", size=(256, 256, 256), is_bounded=False ) target_mag = ( - WKDataset.create("./testoutput/zipped_chunking_target/", scale=(1, 1, 1)) + WKDataset.create(TESTOUTPUT_DIR / "zipped_chunking_target", scale=(1, 1, 1)) .get_or_add_layer( "color", Layer.COLOR_TYPE, dtype_per_channel="uint8", num_channels=3 ) @@ -1724,7 +1743,7 @@ def test_for_zipped_chunks() -> None: def test_for_zipped_chunks_invalid_target_chunk_size_wk() -> None: - delete_dir("./testoutput/zipped_chunking_source_invalid/") + delete_dir(TESTOUTPUT_DIR / "zipped_chunking_source_invalid") test_cases_wk = [ (10, 20, 30), @@ -1734,7 +1753,7 @@ def test_for_zipped_chunks_invalid_target_chunk_size_wk() -> None: ] layer = WKDataset.create( - "./testoutput/zipped_chunking_source_invalid/", scale=(1, 1, 1) + TESTOUTPUT_DIR / "zipped_chunking_source_invalid", scale=(1, 1, 1) ).get_or_add_layer("color", Layer.COLOR_TYPE) source_mag_dataset = layer.get_or_add_mag(1, block_len=8, file_len=8) target_mag_dataset = layer.get_or_add_mag(2, block_len=8, file_len=8) @@ -1760,7 +1779,7 @@ def func(args: Tuple[View, View, int]) -> None: def test_for_zipped_chunks_invalid_target_chunk_size_tiled_tiff() -> None: - delete_dir("./testoutput/zipped_chunking_source_invalid/") + delete_dir(TESTOUTPUT_DIR / "zipped_chunking_source_invalid") test_cases = [ (10, 20, 10), @@ -1769,7 +1788,7 @@ def test_for_zipped_chunks_invalid_target_chunk_size_tiled_tiff() -> None: ] layer = TiledTiffDataset.create( - "./testoutput/zipped_chunking_source_invalid/", + TESTOUTPUT_DIR / "zipped_chunking_source_invalid", scale=(1, 1, 1), tile_size=(64, 64), ).get_or_add_layer("color", Layer.COLOR_TYPE) @@ -1807,10 +1826,10 @@ def func(args: Tuple[View, View, int]) -> None: (s, t, i) = args for offset, size, chunk_size in test_cases: - delete_dir("./testoutput/zipped_chunking_source_invalid/") + delete_dir(TESTOUTPUT_DIR / "zipped_chunking_source_invalid") ds = TiffDataset.create( - "./testoutput/zipped_chunking_source_invalid/", scale=(1, 1, 1) + TESTOUTPUT_DIR / "zipped_chunking_source_invalid", scale=(1, 1, 1) ) color_layer = ds.get_or_add_layer("color", Layer.COLOR_TYPE) seg_layer = ds.get_or_add_layer( @@ -1838,8 +1857,8 @@ def func(args: Tuple[View, View, int]) -> None: def test_read_only_view() -> None: - delete_dir("./testoutput/read_only_view/") - ds = WKDataset.create("./testoutput/read_only_view/", scale=(1, 1, 1)) + delete_dir(TESTOUTPUT_DIR / "read_only_view") + ds = WKDataset.create(TESTOUTPUT_DIR / "read_only_view", scale=(1, 1, 1)) mag = ds.get_or_add_layer("color", Layer.COLOR_TYPE).get_or_add_mag("1") mag.write( data=(np.random.rand(1, 10, 10, 10) * 255).astype(np.uint8), offset=(10, 20, 30) @@ -1865,9 +1884,11 @@ def create_dataset(request: Any) -> Generator[MagDataset, None, None]: dataset_type = request.param with tempfile.TemporaryDirectory() as temp_dir: if dataset_type == TiledTiffDataset: - ds = dataset_type.create(temp_dir, scale=(2, 2, 1), tile_size=(64, 32)) + ds = dataset_type.create( + Path(temp_dir), scale=(2, 2, 1), tile_size=(64, 32) + ) else: - ds = dataset_type.create(temp_dir, scale=(2, 2, 1)) + ds = dataset_type.create(Path(temp_dir), scale=(2, 2, 1)) if dataset_type == WKDataset: mag = ds.add_layer("color", "color").add_mag( diff --git a/tests/test_downsampling.py b/tests/test_downsampling.py index eba74dd30..d08469131 100644 --- a/tests/test_downsampling.py +++ b/tests/test_downsampling.py @@ -1,4 +1,5 @@ import logging +from pathlib import Path from typing import Tuple, cast import numpy as np @@ -22,6 +23,8 @@ WKW_CUBE_SIZE = 1024 CUBE_EDGE_LEN = 256 +TESTOUTPUT_DIR = Path("testoutput") + def read_wkw( wkw_info: WkwDatasetInfo, offset: Tuple[int, int, int], size: Tuple[int, int, int] @@ -79,8 +82,8 @@ def test_non_linear_filter_reshape() -> None: def downsample_test_helper(use_compress: bool) -> None: - source_path = "testdata/WT1_wkw" - target_path = "testoutput/WT1_wkw" + source_path = Path("testdata", "WT1_wkw") + target_path = TESTOUTPUT_DIR / "WT1_wkw" try: shutil.rmtree(target_path) @@ -153,11 +156,11 @@ def test_downsample_multi_channel() -> None: file_len = 32 try: - shutil.rmtree("testoutput/multi-channel-test") + shutil.rmtree(TESTOUTPUT_DIR / "multi-channel-test") except: pass - ds = WKDataset.create("testoutput/multi-channel-test", (1, 1, 1)) + ds = WKDataset.create(TESTOUTPUT_DIR / "multi-channel-test", (1, 1, 1)) l = ds.add_layer( "color", Layer.COLOR_TYPE, dtype_per_channel="uint8", num_channels=num_channels ) @@ -249,7 +252,7 @@ def test_downsampling_padding() -> None: ), ] for args in padding_tests: - ds_path = "./testoutput/larger_wk_dataset/" + ds_path = TESTOUTPUT_DIR / "larger_wk_dataset" try: shutil.rmtree(ds_path) except: diff --git a/tests/test_image_readers.py b/tests/test_image_readers.py index a33c9674f..bb90eceba 100644 --- a/tests/test_image_readers.py +++ b/tests/test_image_readers.py @@ -1,3 +1,5 @@ +from pathlib import Path + from wkcuber.image_readers import ImageReaderManager import numpy as np @@ -5,7 +7,7 @@ def test_rgb_tiff_case() -> None: image_reader_manager = ImageReaderManager() result = image_reader_manager.read_array( - "./testdata/rgb_tiff/test_rgb.tif", np.uint8, 0 + Path("./testdata/rgb_tiff/test_rgb.tif"), np.uint8, 0 ) assert result.shape == (32, 32, 3, 1) assert np.all(result[0][0] == [[0], [255], [0]]) diff --git a/tests/test_mag.py b/tests/test_mag.py index 99d07b00a..e7a77e02e 100644 --- a/tests/test_mag.py +++ b/tests/test_mag.py @@ -1,10 +1,12 @@ +from pathlib import Path + import numpy as np from wkcuber.mag import Mag from wkcuber.metadata import detect_resolutions def test_detect_resolutions() -> None: - resolutions = sorted(list(detect_resolutions("testdata/WT1_wkw", "color"))) + resolutions = sorted(list(detect_resolutions(Path("testdata", "WT1_wkw"), "color"))) assert [mag.to_layer_name() for mag in resolutions] == ["1", "2-2-1"] diff --git a/tests/test_metadata.py b/tests/test_metadata.py index b7eaadefd..8778fe268 100644 --- a/tests/test_metadata.py +++ b/tests/test_metadata.py @@ -1,5 +1,6 @@ +from pathlib import Path + import numpy as np -import os import wkw from wkcuber.cubing import ensure_wkw @@ -14,7 +15,7 @@ def test_element_class_conversion() -> None: - test_wkw_path = os.path.join("testoutput", "test_metadata") + test_wkw_path = Path("testoutput", "test_metadata") prediction_layer_name = "prediction" prediction_wkw_info = WkwDatasetInfo( test_wkw_path, prediction_layer_name, 1, wkw.Header(np.float32, num_channels=3) @@ -40,7 +41,7 @@ def test_element_class_conversion() -> None: def check_element_class_of_layer( - test_wkw_path: str, + test_wkw_path: Path, layer_name: str, expected_element_class: str, expected_dtype: type, @@ -60,7 +61,7 @@ def check_element_class_of_layer( def write_custom_layer( - target_path: str, layer_name: str, dtype: type, num_channels: int + target_path: Path, layer_name: str, dtype: type, num_channels: int ) -> None: data = ( np.arange(4 * 4 * 4 * num_channels) @@ -84,7 +85,7 @@ def test_mapping_detection() -> None: "test_mapping_4.json", "test_mapping_5.json", ] - datapath_with_mappings = "testdata/test_metadata" + datapath_with_mappings = Path("testdata", "test_metadata") layer_name_with_mapping = "segmentation" detected_mappings = detect_mappings(datapath_with_mappings, layer_name_with_mapping) @@ -97,7 +98,7 @@ def test_mapping_detection() -> None: detected_mappings ), "Did not find all mappings." - datapath_without_mappings = "testdata/WT1_wkw" + datapath_without_mappings = Path("testdata", "WT1_wkw") layer_name_without_mapping = "color" detected_mappings = detect_mappings( datapath_without_mappings, layer_name_without_mapping diff --git a/tests/test_upsampling.py b/tests/test_upsampling.py index 606230074..d10fca1ae 100644 --- a/tests/test_upsampling.py +++ b/tests/test_upsampling.py @@ -1,5 +1,6 @@ import shutil import tempfile +from pathlib import Path from typing import Tuple, cast from wkcuber.api.Dataset import WKDataset @@ -15,7 +16,7 @@ def test_upsampling() -> None: with tempfile.TemporaryDirectory() as temp_dir: - ds = WKDataset.create(temp_dir, scale=(1, 1, 1)) + ds = WKDataset.create(Path(temp_dir), scale=(1, 1, 1)) layer = ds.add_layer("color", "COLOR") mag = layer.add_mag([4, 4, 2]) mag.write( @@ -48,7 +49,7 @@ def test_upsample_cube() -> None: def upsample_test_helper(use_compress: bool) -> None: with tempfile.TemporaryDirectory() as temp_dir: - ds = WKDataset.create(temp_dir, scale=(10.5, 10.5, 5)) + ds = WKDataset.create(Path(temp_dir), scale=(10.5, 10.5, 5)) layer = ds.add_layer("color", "COLOR") mag2 = layer.add_mag([2, 2, 2]) @@ -112,11 +113,11 @@ def test_upsample_multi_channel() -> None: file_len = 32 try: - shutil.rmtree("testoutput/multi-channel-test") + shutil.rmtree(Path("testoutput", "multi-channel-test")) except: pass - ds = WKDataset.create("testoutput/multi-channel-test", (1, 1, 1)) + ds = WKDataset.create(Path("testoutput", "multi-channel-test"), (1, 1, 1)) l = ds.add_layer( "color", Layer.COLOR_TYPE, dtype_per_channel="uint8", num_channels=num_channels ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 131964cd3..2099216fa 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,3 +1,6 @@ +from pathlib import Path +from typing import Union + import numpy as np from wkcuber.utils import get_chunks, get_regular_chunks, BufferedSliceWriter import wkw @@ -8,7 +11,7 @@ BLOCK_LEN = 32 -def delete_dir(relative_path: str) -> None: +def delete_dir(relative_path: Union[str, Path]) -> None: if os.path.exists(relative_path) and os.path.isdir(relative_path): rmtree(relative_path) @@ -43,17 +46,17 @@ def test_buffered_slice_writer() -> None: dtype = test_img.dtype bbox = {"topleft": (0, 0, 0), "size": (24, 24, 35)} origin = [0, 0, 0] - dataset_dir = "testoutput/buffered_slice_writer" + dataset_dir = Path("testoutput", "buffered_slice_writer") layer_name = "color" mag = Mag(1) - dataset_path = os.path.join(dataset_dir, layer_name, mag.to_layer_name()) + dataset_path = dataset_dir / layer_name / mag.to_layer_name() delete_dir(dataset_dir) with BufferedSliceWriter(dataset_dir, layer_name, dtype, origin, mag=mag) as writer: for i in range(13): writer.write_slice(i, test_img) - with wkw.Dataset.open(dataset_path, wkw.Header(dtype)) as data: + with wkw.Dataset.open(str(dataset_path), wkw.Header(dtype)) as data: try: read_data = data.read(origin, (24, 24, 13)) if read_data[read_data.nonzero()].size != 0: @@ -67,7 +70,7 @@ def test_buffered_slice_writer() -> None: for i in range(13, 32): writer.write_slice(i, test_img) - with wkw.Dataset.open(dataset_path, wkw.Header(dtype)) as data: + with wkw.Dataset.open(str(dataset_path), wkw.Header(dtype)) as data: read_data = data.read(origin, (24, 24, 32)) assert np.squeeze(read_data).shape == (24, 24, 32), ( "The read data should have the shape: (24, 24, 32) " @@ -80,7 +83,7 @@ def test_buffered_slice_writer() -> None: for i in range(32, 35): writer.write_slice(i, test_img) - with wkw.Dataset.open(dataset_path, wkw.Header(dtype)) as data: + with wkw.Dataset.open(str(dataset_path), wkw.Header(dtype)) as data: read_data = data.read(origin, (24, 24, 35)) read_data = np.squeeze(read_data) assert read_data.shape == (24, 24, 35), ( diff --git a/wkcuber/__main__.py b/wkcuber/__main__.py index 6c6fe07de..cf9291925 100644 --- a/wkcuber/__main__.py +++ b/wkcuber/__main__.py @@ -13,10 +13,9 @@ from pathlib import Path -def detect_present_mags(target_path: str) -> Dict[Path, List[Mag]]: +def detect_present_mags(target_path: Path) -> Dict[Path, List[Mag]]: layer_path_to_mags: Dict[Path, List[Mag]] = dict() - dataset_path = Path(target_path) - layer_paths = list([p for p in dataset_path.iterdir() if p.is_dir()]) + layer_paths = list([p for p in target_path.iterdir() if p.is_dir()]) for layer_p in layer_paths: layer_path_to_mags.setdefault(layer_p, list()) mag_paths = list([p for p in layer_p.iterdir() if p.is_dir()]) diff --git a/wkcuber/api/Dataset.py b/wkcuber/api/Dataset.py index 2c17ec2d6..983a6cda6 100644 --- a/wkcuber/api/Dataset.py +++ b/wkcuber/api/Dataset.py @@ -89,9 +89,9 @@ def copy_job(args: Tuple[View, View, int]) -> None: class AbstractDataset(Generic[LayerT]): @abstractmethod - def __init__(self, dataset_path: Union[str, Path]) -> None: + def __init__(self, dataset_path: Path) -> None: properties: Properties = self._get_properties_type()._from_json( - join(dataset_path, Properties.FILE_NAME) + dataset_path / Properties.FILE_NAME ) self.layers: Dict[str, LayerT] = {} self.path = Path(properties.path).parent @@ -112,27 +112,26 @@ def __init__(self, dataset_path: Union[str, Path]) -> None: @classmethod def create_with_properties(cls, properties: Properties) -> "AbstractDataset": - dataset_path = path.dirname(properties.path) - - if os.path.exists(dataset_path): - assert os.path.isdir( - dataset_path - ), f"Creation of Dataset at {dataset_path} failed, because a file already exists at this path." + dataset_dir = properties.path.parent + if dataset_dir.exists(): + assert ( + dataset_dir.is_dir() + ), f"Creation of Dataset at {dataset_dir} failed, because a file already exists at this path." assert not os.listdir( - dataset_path - ), f"Creation of Dataset at {dataset_path} failed, because a non-empty folder already exists at this path." + dataset_dir + ), f"Creation of Dataset at {dataset_dir} failed, because a non-empty folder already exists at this path." # create directories on disk and write datasource-properties.json try: - makedirs(dataset_path, exist_ok=True) + makedirs(dataset_dir, exist_ok=True) properties._export_as_json() except OSError as e: raise type(e)( - "Creation of Dataset {} failed. ".format(dataset_path) + repr(e) + "Creation of Dataset {} failed. ".format(dataset_dir) + repr(e) ) # initialize object - return cls(dataset_path) + return cls(dataset_dir) def get_properties(self) -> Properties: return self.properties @@ -272,9 +271,9 @@ def delete_layer(self, layer_name: str) -> None: # delete files on disk rmtree(join(self.path, layer_name)) - def add_symlink_layer(self, foreign_layer_path: Union[str, Path]) -> LayerT: - foreign_layer_path = os.path.abspath(foreign_layer_path) - layer_name = os.path.basename(os.path.normpath(foreign_layer_path)) + def add_symlink_layer(self, foreign_layer_path: Path) -> LayerT: + foreign_layer_path = Path(os.path.abspath(foreign_layer_path)) + layer_name = foreign_layer_path.name if layer_name in self.layers.keys(): raise IndexError( f"Cannot create symlink to {foreign_layer_path}. This dataset already has a layer called {layer_name}." @@ -284,7 +283,7 @@ def add_symlink_layer(self, foreign_layer_path: Union[str, Path]) -> LayerT: # copy the properties of the layer into the properties of this dataset layer_properties = self._get_type()( - Path(foreign_layer_path).parent + foreign_layer_path.parent ).properties.data_layers[layer_name] self.properties.data_layers[layer_name] = layer_properties self.properties._export_as_json() @@ -378,7 +377,7 @@ def copy_dataset( def to_wk_dataset( self, - new_dataset_path: Union[str, Path], + new_dataset_path: Path, scale: Optional[Tuple[float, float, float]] = None, ) -> "WKDataset": if scale is None: @@ -389,7 +388,7 @@ def to_wk_dataset( def to_tiff_dataset( self, - new_dataset_path: Union[str, Path], + new_dataset_path: Path, scale: Optional[Tuple[float, float, float]] = None, pattern: Optional[str] = None, ) -> "TiffDataset": @@ -401,7 +400,7 @@ def to_tiff_dataset( def to_tiled_tiff_dataset( self, - new_dataset_path: Union[str, Path], + new_dataset_path: Path, tile_size: Tuple[int, int], scale: Optional[Tuple[float, float, float]] = None, pattern: Optional[str] = None, @@ -426,19 +425,19 @@ def _get_type(self) -> Type["AbstractDataset"]: class WKDataset(AbstractDataset[WKLayer]): @classmethod def create( - cls, dataset_path: Union[str, Path], scale: Tuple[float, float, float] + cls, dataset_path: Path, scale: Tuple[float, float, float] ) -> "WKDataset": name = basename(normpath(dataset_path)) - properties = WKProperties(join(dataset_path, Properties.FILE_NAME), name, scale) + properties = WKProperties(dataset_path / Properties.FILE_NAME, name, scale) return cast(WKDataset, WKDataset.create_with_properties(properties)) @classmethod def get_or_create( - cls, dataset_path: Union[str, Path], scale: Tuple[float, float, float] + cls, dataset_path: Path, scale: Tuple[float, float, float] ) -> "WKDataset": - if os.path.exists( - join(dataset_path, Properties.FILE_NAME) - ): # use the properties file to check if the Dataset exists + if ( + dataset_path / Properties.FILE_NAME + ).exists(): # use the properties file to check if the Dataset exists ds = WKDataset(dataset_path) assert tuple(ds.properties.scale) == tuple( scale @@ -447,7 +446,7 @@ def get_or_create( else: return cls.create(dataset_path, scale) - def __init__(self, dataset_path: Union[str, Path]) -> None: + def __init__(self, dataset_path: Path) -> None: super().__init__(dataset_path) self._data_format = "wkw" assert isinstance(self.properties, WKProperties) @@ -470,16 +469,16 @@ class TiffDataset(AbstractDataset[TiffLayer]): @classmethod def create( cls, - dataset_path: Union[str, Path], + dataset_path: Path, scale: Tuple[float, float, float], pattern: Optional[str] = None, ) -> "TiffDataset": if pattern is None: pattern = "{zzzzz}.tif" validate_pattern(pattern) - name = basename(normpath(dataset_path)) + name = dataset_path.name properties = TiffProperties( - join(dataset_path, "datasource-properties.json"), + dataset_path / "datasource-properties.json", name, scale, pattern=pattern, @@ -490,7 +489,7 @@ def create( @classmethod def get_or_create( cls, - dataset_path: Union[str, Path], + dataset_path: Path, scale: Tuple[float, float, float], pattern: str = None, ) -> "TiffDataset": @@ -512,7 +511,7 @@ def get_or_create( else: return cls.create(dataset_path, scale, pattern) - def __init__(self, dataset_path: Union[str, Path]) -> None: + def __init__(self, dataset_path: Path) -> None: super().__init__(dataset_path) self.data_format = "tiff" assert isinstance(self.properties, TiffProperties) @@ -535,7 +534,7 @@ class TiledTiffDataset(AbstractDataset[TiledTiffLayer]): @classmethod def create( cls, - dataset_path: Union[str, Path], + dataset_path: Path, scale: Tuple[float, float, float], tile_size: Tuple[int, int], pattern: Optional[str] = None, @@ -543,9 +542,9 @@ def create( if pattern is None: pattern = "{xxxxx}/{yyyyy}/{zzzzz}.tif" validate_pattern(pattern) - name = basename(normpath(dataset_path)) + name = dataset_path.name properties = TiffProperties( - join(dataset_path, "datasource-properties.json"), + dataset_path / "datasource-properties.json", name, scale, pattern=pattern, @@ -558,7 +557,7 @@ def create( @classmethod def get_or_create( cls, - dataset_path: Union[str, Path], + dataset_path: Path, scale: Tuple[float, float, float], tile_size: Tuple[int, int], pattern: str = None, @@ -585,7 +584,7 @@ def get_or_create( else: return cls.create(dataset_path, scale, tile_size, pattern) - def __init__(self, dataset_path: Union[str, Path]) -> None: + def __init__(self, dataset_path: Path) -> None: super().__init__(dataset_path) self.data_format = "tiled_tiff" assert isinstance(self.properties, TiffProperties) diff --git a/wkcuber/api/Layer.py b/wkcuber/api/Layer.py index 360237af9..3a49be7fe 100644 --- a/wkcuber/api/Layer.py +++ b/wkcuber/api/Layer.py @@ -453,7 +453,7 @@ def setup_mag(self, mag: Union[str, Mag]) -> None: try: with wkw.Dataset.open( - find_mag_path_on_disk(self.dataset.path, self.name, mag) + str(find_mag_path_on_disk(self.dataset.path, self.name, mag)) ) as wkw_dataset: wk_header = wkw_dataset.header diff --git a/wkcuber/api/MagDataset.py b/wkcuber/api/MagDataset.py index ed23f2678..4198d51bc 100644 --- a/wkcuber/api/MagDataset.py +++ b/wkcuber/api/MagDataset.py @@ -34,12 +34,10 @@ from wkcuber.mag import Mag -def find_mag_path_on_disk( - dataset_path: Union[str, Path], layer_name: str, mag_name: str -) -> str: +def find_mag_path_on_disk(dataset_path: Path, layer_name: str, mag_name: str) -> Path: mag = Mag(mag_name) - short_mag_file_path = join(dataset_path, layer_name, mag.to_layer_name()) - long_mag_file_path = join(dataset_path, layer_name, mag.to_long_layer_name()) + short_mag_file_path = dataset_path / layer_name / mag.to_layer_name() + long_mag_file_path = dataset_path / layer_name / mag.to_long_layer_name() if os.path.exists(short_mag_file_path): return short_mag_file_path else: @@ -213,7 +211,7 @@ def get_bounding_boxes_on_disk( assert self.view.dataset is not None for filename in self.view.dataset.list_files(): - file_path = os.path.relpath(os.path.splitext(filename)[0], self.view.path) + file_path = Path(os.path.splitext(filename)[0]).relative_to(self.view.path) cube_index = self._extract_file_index(file_path) cube_offset = [idx * size for idx, size in zip(cube_index, cube_size)] @@ -222,7 +220,7 @@ def get_bounding_boxes_on_disk( if not was_opened: self.close() - def _extract_file_index(self, file_path: str) -> Tuple[int, int, int]: + def _extract_file_index(self, file_path: Path) -> Tuple[int, int, int]: raise NotImplementedError @@ -263,8 +261,8 @@ def _get_view_type(self) -> Type[WKView]: def _get_file_dimensions(self) -> Tuple[int, int, int]: return cast(Tuple[int, int, int], (self.file_len * self.block_len,) * 3) - def _extract_file_index(self, file_path: str) -> Tuple[int, int, int]: - zyx_index = [int(el[1:]) for el in file_path.split("/")] + def _extract_file_index(self, file_path: Path) -> Tuple[int, int, int]: + zyx_index = [int(el[1:]) for el in file_path.parts] return zyx_index[2], zyx_index[1], zyx_index[0] @@ -295,10 +293,10 @@ def _get_file_dimensions(self) -> Tuple[int, int, int]: return self.view.size[0], self.view.size[1], 1 - def _extract_file_index(self, file_path: str) -> Tuple[int, int, int]: - x_list = detect_value(self.pattern, file_path, "x", ["y", "z"]) - y_list = detect_value(self.pattern, file_path, "y", ["x", "z"]) - z_list = detect_value(self.pattern, file_path, "z", ["x", "y"]) + def _extract_file_index(self, file_path: Path) -> Tuple[int, int, int]: + x_list = detect_value(self.pattern, str(file_path), "x", ["y", "z"]) + y_list = detect_value(self.pattern, str(file_path), "y", ["x", "z"]) + z_list = detect_value(self.pattern, str(file_path), "z", ["x", "y"]) x = x_list[0] if len(x_list) == 1 else 0 y = y_list[0] if len(y_list) == 1 else 0 z = z_list[0] if len(z_list) == 1 else 0 diff --git a/wkcuber/api/Properties/DatasetProperties.py b/wkcuber/api/Properties/DatasetProperties.py index 73713b053..a5251c35d 100644 --- a/wkcuber/api/Properties/DatasetProperties.py +++ b/wkcuber/api/Properties/DatasetProperties.py @@ -17,7 +17,7 @@ class Properties: def __init__( self, - path: Union[str, Path], + path: Path, name: str, scale: Tuple[float, float, float], team: str = "", @@ -33,7 +33,7 @@ def __init__( self._data_layers = data_layers @classmethod - def _from_json(cls, path: Union[str, Path]) -> "Properties": + def _from_json(cls, path: Path) -> "Properties": pass def _export_as_json(self) -> None: @@ -99,8 +99,8 @@ def name(self) -> str: return self._name @property - def path(self) -> str: - return self._path + def path(self) -> Path: + return Path(self._path) @property def team(self) -> str: @@ -117,7 +117,7 @@ def data_layers(self) -> dict: class WKProperties(Properties): @classmethod - def _from_json(cls, path: Union[str, Path]) -> "WKProperties": + def _from_json(cls, path: Path) -> "WKProperties": with open(path) as datasource_properties: data = json.load(datasource_properties) @@ -167,7 +167,7 @@ def _add_mag(self, layer_name: str, mag: str, **kwargs: int) -> None: class TiffProperties(Properties): def __init__( self, - path: Union[str, Path], + path: Path, name: str, scale: Tuple[float, float, float], pattern: str, @@ -180,7 +180,7 @@ def __init__( self.tile_size = tile_size @classmethod - def _from_json(cls, path: Union[str, Path]) -> Properties: + def _from_json(cls, path: Path) -> Properties: with open(path) as datasource_properties: data = json.load(datasource_properties) diff --git a/wkcuber/api/Properties/LayerProperties.py b/wkcuber/api/Properties/LayerProperties.py index c0c81dae6..3ded0fa61 100644 --- a/wkcuber/api/Properties/LayerProperties.py +++ b/wkcuber/api/Properties/LayerProperties.py @@ -11,7 +11,7 @@ def extract_num_channels( num_channels_in_properties: Optional[int], - path: Union[str, Path], + path: Path, layer: str, mag: Optional[Dict[str, int]], ) -> int: @@ -90,7 +90,7 @@ def _from_json( cls, json_data: Dict[str, Any], resolution_type: Type[Resolution], - dataset_path: Union[str, Path], + dataset_path: Path, ) -> "LayerProperties": # create LayerProperties without resolutions layer_properties = cls( @@ -220,7 +220,7 @@ def _from_json( cls, json_data: Dict[str, Any], resolution_type: Type[Resolution], - dataset_path: Union[str, Path], + dataset_path: Path, ) -> "SegmentationLayerProperties": # create LayerProperties without resolutions layer_properties = cls( diff --git a/wkcuber/api/TiffData/TiffMag.py b/wkcuber/api/TiffData/TiffMag.py index 1e62ddc36..5d5d8365f 100644 --- a/wkcuber/api/TiffData/TiffMag.py +++ b/wkcuber/api/TiffData/TiffMag.py @@ -26,7 +26,7 @@ def replace_coordinate(pattern: str, coord_id: str, coord: int) -> str: def detect_tile_ranges( - tiled_dataset_path_parent: Optional[str], tiled_dataset_path_pattern: Optional[str] + tiled_dataset_path_parent: Optional[Path], tiled_dataset_path_pattern: Optional[str] ) -> Tuple[range, range, range]: if tiled_dataset_path_pattern is not None: if tiled_dataset_path_parent is not None: @@ -182,7 +182,7 @@ def __init__( class TiffMag: - def __init__(self, root: str, header: TiffMagHeader) -> None: + def __init__(self, root: Path, header: TiffMagHeader) -> None: self.root = root self.tiffs = dict() @@ -299,7 +299,8 @@ def write(self, off: Tuple[int, int, int], data: np.ndarray) -> None: def list_files(self) -> Iterator[str]: _, file_extension = os.path.splitext(self.header.pattern) return iglob( - self.root + "/" + re.sub(r"{.*?}", "*", self.header.pattern), recursive=True + str(self.root) + "/" + re.sub(r"{.*?}", "*", self.header.pattern), + recursive=True, ) def close(self) -> None: @@ -409,12 +410,12 @@ def assert_correct_data_format(self, data: np.ndarray) -> None: def get_file_name_for_layer( self, xyz: Tuple[Optional[int], Optional[int], Optional[int]] - ) -> str: + ) -> Path: x, y, z = xyz - return os.path.join(self.root, to_file_name(self.header.pattern, x, y, z)) + return self.root / to_file_name(self.header.pattern, x, y, z) @staticmethod - def open(root: str, header: TiffMagHeader = None) -> "TiffMag": + def open(root: Path, header: TiffMagHeader = None) -> "TiffMag": if header is None: header = TiffMagHeader() return TiffMag(root, header) @@ -441,26 +442,28 @@ def transpose_for_skimage(data: np.ndarray) -> np.ndarray: class TiffReader: - def __init__(self, file_name: str): + def __init__(self, file_name: Path): self.file_name = file_name @classmethod - def init_tiff(cls, pixels: np.ndarray, file_name: str) -> "TiffReader": + def init_tiff(cls, pixels: np.ndarray, file_name: Path) -> "TiffReader": tr = TiffReader(file_name) tr.write(pixels) return tr @classmethod - def open(cls, file_name: str) -> "TiffReader": + def open(cls, file_name: Path) -> "TiffReader": return cls(file_name) def read(self) -> np.array: - data = io.imread(self.file_name) + data = io.imread(str(self.file_name)) return transpose_for_skimage(data) def write(self, pixels: np.ndarray) -> None: - os.makedirs(os.path.dirname(self.file_name), exist_ok=True) - io.imsave(self.file_name, transpose_for_skimage(pixels), check_contrast=False) + os.makedirs(self.file_name.parent, exist_ok=True) + io.imsave( + str(self.file_name), transpose_for_skimage(pixels), check_contrast=False + ) def merge_with_image( self, foreground_pixels: np.ndarray, offset: Tuple[int, int] diff --git a/wkcuber/api/View.py b/wkcuber/api/View.py index 9f573e86c..9fd4a051f 100644 --- a/wkcuber/api/View.py +++ b/wkcuber/api/View.py @@ -17,7 +17,7 @@ class View: def __init__( self, - path_to_mag_dataset: str, + path_to_mag_dataset: Path, header: Union[TiffMagHeader, wkw.Header], size: Tuple[int, int, int], global_offset: Tuple[int, int, int] = (0, 0, 0), @@ -312,7 +312,7 @@ def open(self) -> "WKView": raise Exception("Cannot open view: the view is already opened") else: self.dataset = Dataset.open( - self.path + str(self.path) ) # No need to pass the header to the wkw.Dataset self._is_opened = True return self diff --git a/wkcuber/check_equality.py b/wkcuber/check_equality.py index 8eb9118f3..f527299db 100644 --- a/wkcuber/check_equality.py +++ b/wkcuber/check_equality.py @@ -1,5 +1,6 @@ import logging from argparse import ArgumentParser, Namespace +from pathlib import Path from typing import Any, Callable from wkcuber.api.Dataset import WKDataset @@ -36,10 +37,12 @@ def named_partial(func: Callable, *args: Any, **kwargs: Any) -> Callable: def create_parser() -> ArgumentParser: parser = ArgumentParser() - parser.add_argument("source_path", help="Path to input WKW dataset") + parser.add_argument("source_path", help="Path to input WKW dataset", type=Path) parser.add_argument( - "target_path", help="WKW dataset with which to compare the input dataset." + "target_path", + help="WKW dataset with which to compare the input dataset.", + type=Path, ) parser.add_argument( @@ -56,8 +59,8 @@ def create_parser() -> ArgumentParser: def assert_equality_for_chunk( - source_path: str, - target_path: str, + source_path: Path, + target_path: Path, layer_name: str, mag: Mag, sub_box: BoundingBoxNamedTuple, @@ -76,7 +79,9 @@ def assert_equality_for_chunk( ), f"Data differs in bounding box {sub_box} for layer {layer_name} with mag {mag}" -def check_equality(source_path: str, target_path: str, args: Namespace = None) -> None: +def check_equality( + source_path: Path, target_path: Path, args: Namespace = None +) -> None: logging.info(f"Comparing {source_path} with {target_path}") diff --git a/wkcuber/compress.py b/wkcuber/compress.py index c9c0a7868..b8b8da6fb 100644 --- a/wkcuber/compress.py +++ b/wkcuber/compress.py @@ -1,4 +1,6 @@ import time +from pathlib import Path + import wkw import shutil import logging @@ -26,7 +28,7 @@ def create_parser() -> ArgumentParser: parser = ArgumentParser() parser.add_argument( - "source_path", help="Directory containing the source WKW dataset." + "source_path", help="Directory containing the source WKW dataset.", type=Path ) parser.add_argument( @@ -34,6 +36,7 @@ def create_parser() -> ArgumentParser: help="Output directory for the compressed WKW dataset.", nargs="?", default=None, + type=Path, ) parser.add_argument( @@ -76,9 +79,9 @@ def compress_file_job(args: Tuple[str, str]) -> None: def compress_mag( - source_path: str, + source_path: Path, layer_name: str, - target_path: str, + target_path: Path, mag: Mag, args: Namespace = None, ) -> None: @@ -110,9 +113,9 @@ def compress_mag( def compress_mag_inplace( - target_path: str, layer_name: str, mag: Mag, args: Namespace = None + target_path: Path, layer_name: str, mag: Mag, args: Namespace = None ) -> None: - compress_target_path = "{}.compress-{}".format(target_path, uuid4()) + compress_target_path = Path("{}.compress-{}".format(target_path, uuid4())) compress_mag(target_path, layer_name, compress_target_path, mag, args) shutil.rmtree(path.join(target_path, layer_name, str(mag))) @@ -124,14 +127,14 @@ def compress_mag_inplace( def compress_mags( - source_path: str, + source_path: Path, layer_name: str, - target_path: str = None, + target_path: Path = None, mags: List[Mag] = None, args: Namespace = None, ) -> None: if target_path is None: - target = source_path + ".tmp" + target = source_path.with_suffix(".tmp") else: target = target_path @@ -143,15 +146,16 @@ def compress_mags( compress_mag(source_path, layer_name, target, mag, args) if target_path is None: - makedirs(path.join(source_path + BACKUP_EXT, layer_name), exist_ok=True) + backup_dir = source_path.with_suffix(BACKUP_EXT) + makedirs(backup_dir / layer_name, exist_ok=True) for mag in mags: shutil.move( - path.join(source_path, layer_name, str(mag)), - path.join(source_path + BACKUP_EXT, layer_name, str(mag)), + str(source_path / layer_name / str(mag)), + str(backup_dir / layer_name / str(mag)), ) shutil.move( - path.join(target, layer_name, str(mag)), - path.join(source_path, layer_name, str(mag)), + str(target / layer_name / str(mag)), + str(source_path / layer_name / str(mag)), ) shutil.rmtree(target) logging.info( diff --git a/wkcuber/convert_knossos.py b/wkcuber/convert_knossos.py index b3aab981a..b5b8cc309 100644 --- a/wkcuber/convert_knossos.py +++ b/wkcuber/convert_knossos.py @@ -1,5 +1,6 @@ import time import logging +from pathlib import Path from typing import Tuple, cast import wkw @@ -25,11 +26,13 @@ def create_parser() -> ArgumentParser: parser = ArgumentParser() parser.add_argument( - "source_path", help="Directory containing the source KNOSSOS dataset." + "source_path", + help="Directory containing the source KNOSSOS dataset.", + type=Path, ) parser.add_argument( - "target_path", help="Output directory for the generated WKW dataset." + "target_path", help="Output directory for the generated WKW dataset.", type=Path ) parser.add_argument( @@ -76,8 +79,8 @@ def convert_cube_job( def convert_knossos( - source_path: str, - target_path: str, + source_path: Path, + target_path: Path, layer_name: str, dtype: str, mag: int = 1, @@ -109,7 +112,12 @@ def convert_knossos( def main(args: Namespace) -> None: convert_knossos( - args.source_path, args.target_path, args.layer_name, args.dtype, args.mag, args + args.source_path, + args.target_path, + args.layer_name, + args.dtype, + args.mag, + args, ) diff --git a/wkcuber/convert_nifti.py b/wkcuber/convert_nifti.py index 2f991ff7d..f38cecaa1 100644 --- a/wkcuber/convert_nifti.py +++ b/wkcuber/convert_nifti.py @@ -28,10 +28,11 @@ def create_parser() -> ArgumentParser: "source_path", help="Path to NIFTY file or to a directory if multiple NIFTI files should be converted. " "In the latter case, also see --color_file and --segmentation_file.", + type=Path, ) parser.add_argument( - "target_path", help="Output directory for the generated WKW dataset." + "target_path", help="Output directory for the generated WKW dataset.", type=Path ) parser.add_argument( @@ -321,7 +322,7 @@ def convert_folder_nifti( def main(args: Namespace) -> None: - source_path = Path(args.source_path) + source_path = args.source_path flip_axes = None if args.flip_axes is not None: @@ -342,7 +343,7 @@ def main(args: Namespace) -> None: if source_path.is_dir(): convert_folder_nifti( source_path, - Path(args.target_path), + args.target_path, args.color_file, args.segmentation_file, **conversion_args, @@ -350,7 +351,7 @@ def main(args: Namespace) -> None: else: convert_nifti( source_path, - Path(args.target_path), + args.target_path, args.layer_name, args.dtype, is_segmentation_layer=args.is_segmentation_layer, diff --git a/wkcuber/converter.py b/wkcuber/converter.py index 92ecb58af..d45cf2ec0 100644 --- a/wkcuber/converter.py +++ b/wkcuber/converter.py @@ -23,11 +23,13 @@ def create_parser() -> ArgumentParser: ) parser.add_argument( - "source_path", help="Input file or directory containing the input files." + "source_path", + help="Input file or directory containing the input files.", + type=Path, ) parser.add_argument( - "target_path", help="Output directory for the generated dataset." + "target_path", help="Output directory for the generated dataset.", type=Path ) add_scale_flag(parser) @@ -356,7 +358,7 @@ def convert_input(self, args: Namespace) -> bool: for layer_path, layer_name in layer_path_to_name.items(): args.layer_name = layer_name - args.source_path = layer_path + args.source_path = Path(layer_path) cube_image_stack( args.source_path, args.target_path, diff --git a/wkcuber/cubing.py b/wkcuber/cubing.py index 9d866ab4d..19ab4095d 100644 --- a/wkcuber/cubing.py +++ b/wkcuber/cubing.py @@ -38,10 +38,12 @@ def create_parser() -> ArgumentParser: parser = ArgumentParser() - parser.add_argument("source_path", help="Directory containing the input images.") + parser.add_argument( + "source_path", help="Directory containing the input images.", type=Path + ) parser.add_argument( - "target_path", help="Output directory for the generated dataset." + "target_path", help="Output directory for the generated dataset.", type=Path ) parser.add_argument( @@ -95,17 +97,18 @@ def create_parser() -> ArgumentParser: return parser -def find_source_filenames(source_path: str) -> List[str]: +def find_source_filenames(source_path: Path) -> List[Path]: # Find all source files that have a matching file extension + if source_path.is_dir(): + source_path_str = path.join(source_path, "*") + else: + source_path_str = str(source_path) - if Path(source_path).is_dir(): - source_path = path.join(source_path, "*") - - source_files = list(find_files(source_path, image_reader.readers.keys())) + source_files = list(find_files(source_path_str, image_reader.readers.keys())) assert len(source_files) > 0, ( "No image files found in path " - + source_path + + source_path_str + ". Supported suffixes are " + str(image_reader.readers.keys()) + "." @@ -114,7 +117,7 @@ def find_source_filenames(source_path: str) -> List[str]: return natsorted(source_files) -def read_image_file(file_name: str, dtype: type, z_slice: int) -> np.ndarray: +def read_image_file(file_name: Path, dtype: type, z_slice: int) -> np.ndarray: try: return image_reader.read_array(file_name, dtype, z_slice) except Exception as exc: @@ -235,8 +238,8 @@ def cubing_job( def cubing( - source_path: str, - target_path: str, + source_path: Path, + target_path: Path, layer_name: str, batch_size: Optional[int], args: Namespace, diff --git a/wkcuber/downsampling.py b/wkcuber/downsampling.py index 5b4de342a..11b12994a 100644 --- a/wkcuber/downsampling.py +++ b/wkcuber/downsampling.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Optional, Tuple from argparse import ArgumentParser, Namespace @@ -18,7 +19,7 @@ def create_parser() -> ArgumentParser: parser = ArgumentParser() - parser.add_argument("path", help="Directory containing the dataset.") + parser.add_argument("path", help="Directory containing the dataset.", type=Path) parser.add_argument( "--layer_name", @@ -84,7 +85,7 @@ def create_parser() -> ArgumentParser: def downsample_mags( - path: str, + path: Path, layer_name: str = None, from_mag: Mag = None, max_mag: Mag = Mag(32), @@ -106,7 +107,7 @@ def downsample_mags( if not layer_name or not from_mag: layer_name = os.path.basename(os.path.dirname(path)) from_mag = Mag(os.path.basename(path)) - path = os.path.dirname(os.path.dirname(path)) + path = path.parent.parent WKDataset(path).get_layer(layer_name).downsample( from_mag=from_mag, @@ -121,7 +122,7 @@ def downsample_mags( def downsample_mags_isotropic( - path: str, + path: Path, layer_name: str, from_mag: Mag, max_mag: Optional[Mag], @@ -143,7 +144,7 @@ def downsample_mags_isotropic( def downsample_mags_anisotropic( - path: str, + path: Path, layer_name: str, from_mag: Mag, max_mag: Optional[Mag], diff --git a/wkcuber/export_wkw_as_tiff.py b/wkcuber/export_wkw_as_tiff.py index 00aea420a..90ae5676c 100644 --- a/wkcuber/export_wkw_as_tiff.py +++ b/wkcuber/export_wkw_as_tiff.py @@ -1,6 +1,8 @@ from argparse import ArgumentParser, Namespace from functools import partial import logging +from pathlib import Path + import wkw import os from math import ceil @@ -25,7 +27,11 @@ def create_parser() -> ArgumentParser: parser = ArgumentParser() parser.add_argument( - "--source_path", "-s", help="Directory containing the wkw file.", required=True + "--source_path", + "-s", + help="Directory containing the wkw file.", + required=True, + type=Path, ) parser.add_argument( @@ -33,6 +39,7 @@ def create_parser() -> ArgumentParser: "-d", help="Output directory for the generated tiff files.", required=True, + type=Path, ) parser.add_argument( @@ -119,9 +126,9 @@ def wkw_slice_to_image(data_slice: np.ndarray, downsample: int = 1) -> Image: def export_tiff_slice( tiff_bbox: Dict[str, Tuple[int, int, int]], - dest_path: str, + dest_path: Path, name: str, - dataset_path: str, + dataset_path: Path, tiling_size: Union[None, Tuple[int, int]], batch_size: int, downsample: int, @@ -144,7 +151,7 @@ def export_tiff_slice( (tiff_bbox["topleft"][2] + batch_number * batch_size) // mag.mag[2], ) - with wkw.Dataset.open(dataset_path) as dataset: + with wkw.Dataset.open(str(dataset_path)) as dataset: if tiling_size is None: tiff_data = dataset.read(tiff_bbox["topleft"], tiff_bbox["size"]) else: @@ -158,7 +165,7 @@ def export_tiff_slice( slice_name_number = batch_number * batch_size + slice_index + 1 if tiling_size is None: tiff_file_name = wkw_name_and_bbox_to_tiff_name(name, slice_name_number) - tiff_file_path = os.path.join(dest_path, tiff_file_name) + tiff_file_path = dest_path / tiff_file_name image = wkw_slice_to_image(tiff_data[:, :, :, slice_index], downsample) image.save(tiff_file_path) @@ -166,8 +173,8 @@ def export_tiff_slice( else: for y_tile_index in range(ceil(tiff_bbox["size"][1] / tiling_size[1])): - tile_tiff_path = os.path.join( - dest_path, str(slice_name_number), str(y_tile_index + 1) + tile_tiff_path = ( + dest_path / str(slice_name_number) / str(y_tile_index + 1) ) os.makedirs(tile_tiff_path, exist_ok=True) for x_tile_index in range( @@ -198,11 +205,11 @@ def export_tiff_slice( def export_tiff_stack( - wkw_file_path: str, + wkw_file_path: Path, wkw_layer: str, bbox: Dict[str, List[int]], mag: Mag, - destination_path: str, + destination_path: Path, name: str, tiling_slice_size: Union[None, Tuple[int, int]], batch_size: int, @@ -210,7 +217,7 @@ def export_tiff_stack( args: Namespace, ) -> None: os.makedirs(destination_path, exist_ok=True) - dataset_path = os.path.join(wkw_file_path, wkw_layer, mag.to_layer_name()) + dataset_path = wkw_file_path / wkw_layer / mag.to_layer_name() with get_executor_for_args(args) as executor: num_slices = ceil(bbox["size"][2] / batch_size) diff --git a/wkcuber/image_readers.py b/wkcuber/image_readers.py index caba50572..49e8840d4 100644 --- a/wkcuber/image_readers.py +++ b/wkcuber/image_readers.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import Tuple, Dict, Union, Optional import numpy as np @@ -15,36 +16,36 @@ class ImageReader: - def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarray: + def read_array(self, file_name: Path, dtype: np.dtype, z_slice: int) -> np.ndarray: pass - def read_dimensions(self, file_name: str) -> Tuple[int, int]: + def read_dimensions(self, file_name: Path) -> Tuple[int, int]: pass - def read_channel_count(self, file_name: str) -> int: + def read_channel_count(self, file_name: Path) -> int: pass def read_z_slices_per_file( - self, file_name: str # pylint: disable=unused-argument + self, file_name: Path # pylint: disable=unused-argument ) -> int: return 1 - def read_dtype(self, file_name: str) -> str: + def read_dtype(self, file_name: Path) -> str: raise NotImplementedError() class PillowImageReader(ImageReader): - def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarray: + def read_array(self, file_name: Path, dtype: np.dtype, z_slice: int) -> np.ndarray: this_layer = np.array(Image.open(file_name), dtype) this_layer = this_layer.swapaxes(0, 1) this_layer = this_layer.reshape(this_layer.shape + (1,)) return this_layer - def read_dimensions(self, file_name: str) -> Tuple[int, int]: + def read_dimensions(self, file_name: Path) -> Tuple[int, int]: with Image.open(file_name) as test_img: return test_img.width, test_img.height - def read_channel_count(self, file_name: str) -> int: + def read_channel_count(self, file_name: Path) -> int: with Image.open(file_name) as test_img: this_layer = np.array(test_img) if this_layer.ndim == 2: @@ -53,7 +54,7 @@ def read_channel_count(self, file_name: str) -> int: else: return this_layer.shape[-1] # pylint: disable=unsubscriptable-object - def read_dtype(self, file_name: str) -> str: + def read_dtype(self, file_name: Path) -> str: return np.array(Image.open(file_name)).dtype.name @@ -63,22 +64,22 @@ def to_target_datatype(data: np.ndarray, target_dtype: np.dtype) -> np.ndarray: class Dm3ImageReader(ImageReader): - def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarray: + def read_array(self, file_name: Path, dtype: np.dtype, z_slice: int) -> np.ndarray: dm3_file = DM3(file_name) this_layer = to_target_datatype(dm3_file.imagedata, dtype) this_layer = this_layer.swapaxes(0, 1) this_layer = this_layer.reshape(this_layer.shape + (1,)) return this_layer - def read_dimensions(self, file_name: str) -> Tuple[int, int]: + def read_dimensions(self, file_name: Path) -> Tuple[int, int]: test_img = DM3(file_name) return test_img.width, test_img.height - def read_channel_count(self, _file_name: str) -> int: + def read_channel_count(self, _file_name: Path) -> int: logging.info("Assuming single channel for DM3 data") return 1 - def read_dtype(self, file_name: str) -> str: + def read_dtype(self, file_name: Path) -> str: return DM3(file_name).imagedata.dtype.name @@ -105,7 +106,7 @@ def _read_dimensions( ) return width, height - def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarray: + def read_array(self, file_name: Path, dtype: np.dtype, z_slice: int) -> np.ndarray: dm4file = DM4File.open(file_name) image_data_tag, image_tag = self._read_tags(dm4file) width, height = self._read_dimensions(dm4file, image_data_tag) @@ -120,7 +121,7 @@ def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarra return data - def read_dimensions(self, file_name: str) -> Tuple[int, int]: + def read_dimensions(self, file_name: Path) -> Tuple[int, int]: dm4file = DM4File.open(file_name) image_data_tag, _ = self._read_tags(dm4file) dimensions = self._read_dimensions(dm4file, image_data_tag) @@ -128,11 +129,11 @@ def read_dimensions(self, file_name: str) -> Tuple[int, int]: return dimensions - def read_channel_count(self, _file_name: str) -> int: + def read_channel_count(self, _file_name: Path) -> int: logging.info("Assuming single channel for DM4 data") return 1 - def read_dtype(self, file_name: str) -> str: # pylint: disable=unused-argument + def read_dtype(self, file_name: Path) -> str: # pylint: disable=unused-argument # DM4 standard input type is uint16 return "uint16" @@ -152,7 +153,7 @@ def find_count_of_axis(tif_file: TiffFile, axis: str) -> int: else: return tif_series.shape[index] # pylint: disable=unsubscriptable-object - def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarray: + def read_array(self, file_name: Path, dtype: np.dtype, z_slice: int) -> np.ndarray: with TiffFile(file_name) as tif_file: if self.num_channels is None: self.num_channels = self.read_channel_count(file_name) @@ -205,14 +206,14 @@ def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarra data = data.reshape(data.shape + (1,)) return data - def read_dimensions(self, file_name: str) -> Tuple[int, int]: + def read_dimensions(self, file_name: Path) -> Tuple[int, int]: with TiffFile(file_name) as tif_file: return ( TiffImageReader.find_count_of_axis(tif_file, "X"), TiffImageReader.find_count_of_axis(tif_file, "Y"), ) - def read_channel_count(self, file_name: str) -> int: + def read_channel_count(self, file_name: Path) -> int: with TiffFile(file_name) as tif_file: c_count = TiffImageReader.find_count_of_axis(tif_file, "C") s_count = TiffImageReader.find_count_of_axis(tif_file, "S") @@ -224,11 +225,11 @@ def read_channel_count(self, file_name: str) -> int: else: return c_count - def read_z_slices_per_file(self, file_name: str) -> int: + def read_z_slices_per_file(self, file_name: Path) -> int: with TiffFile(file_name) as tif_file: return TiffImageReader.find_count_of_axis(tif_file, "Z") - def read_dtype(self, file_name: str) -> str: + def read_dtype(self, file_name: Path) -> str: with TiffFile(file_name) as tif_file: return tif_file.series[ # pylint: disable=unsubscriptable-object 0 @@ -308,7 +309,7 @@ def _read_array_all_channels( data = to_target_datatype(data, dtype) return data - def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarray: + def read_array(self, file_name: Path, dtype: np.dtype, z_slice: int) -> np.ndarray: with CziFile(file_name) as czi_file: channel_index = czi_file.axes.find("C") # pylint: disable=no-member channel_count = self.read_channel_count(file_name) @@ -341,22 +342,22 @@ def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarra else: return self._read_array_all_channels(czi_file, dtype, z_slice) - def read_dimensions(self, file_name: str) -> Tuple[int, int]: + def read_dimensions(self, file_name: Path) -> Tuple[int, int]: with CziFile(file_name) as czi_file: return ( CziImageReader.find_count_of_axis(czi_file, "X"), CziImageReader.find_count_of_axis(czi_file, "Y"), ) - def read_channel_count(self, file_name: str) -> int: + def read_channel_count(self, file_name: Path) -> int: with CziFile(file_name) as czi_file: return CziImageReader.find_count_of_axis(czi_file, "C") - def read_z_slices_per_file(self, file_name: str) -> int: + def read_z_slices_per_file(self, file_name: Path) -> int: with CziFile(file_name) as czi_file: return CziImageReader.find_count_of_axis(czi_file, "Z") - def read_dtype(self, file_name: str) -> str: + def read_dtype(self, file_name: Path) -> str: with CziFile(file_name) as czi_file: return czi_file.dtype.name # pylint: disable=no-member @@ -383,7 +384,7 @@ def __init__(self) -> None: ".czi": CziImageReader(), } - def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarray: + def read_array(self, file_name: Path, dtype: np.dtype, z_slice: int) -> np.ndarray: _, ext = path.splitext(file_name) # Image shape will be (x, y, channel_count, z=1) or (x, y, z=1) @@ -394,19 +395,19 @@ def read_array(self, file_name: str, dtype: np.dtype, z_slice: int) -> np.ndarra return image - def read_dimensions(self, file_name: str) -> Tuple[int, int]: + def read_dimensions(self, file_name: Path) -> Tuple[int, int]: _, ext = path.splitext(file_name) return self.readers[ext].read_dimensions(file_name) - def read_channel_count(self, file_name: str) -> int: + def read_channel_count(self, file_name: Path) -> int: _, ext = path.splitext(file_name) return self.readers[ext].read_channel_count(file_name) - def read_z_slices_per_file(self, file_name: str) -> int: + def read_z_slices_per_file(self, file_name: Path) -> int: _, ext = path.splitext(file_name) return self.readers[ext].read_z_slices_per_file(file_name) - def read_dtype(self, file_name: str) -> str: + def read_dtype(self, file_name: Path) -> str: _, ext = path.splitext(file_name) return self.readers[ext].read_dtype(file_name) diff --git a/wkcuber/knossos.py b/wkcuber/knossos.py index 49e2de774..ad0cb456c 100644 --- a/wkcuber/knossos.py +++ b/wkcuber/knossos.py @@ -1,5 +1,6 @@ +from pathlib import Path from types import TracebackType -from typing import Tuple, Any, Generator, Iterator, Optional, Type +from typing import Tuple, Any, Generator, Iterator, Optional, Type, Union import numpy as np import os @@ -14,8 +15,8 @@ class KnossosDataset: - def __init__(self, root: str, dtype: np.dtype = np.uint8): - self.root = root + def __init__(self, root: Union[str, Path], dtype: np.dtype = np.uint8): + self.root = str(root) self.dtype = dtype def read( @@ -98,8 +99,8 @@ def close(self) -> None: pass @staticmethod - def open(root: str, dtype: Optional[np.dtype]) -> "KnossosDataset": - return KnossosDataset(root, dtype) + def open(root: Union[str, Path], dtype: Optional[np.dtype]) -> "KnossosDataset": + return KnossosDataset(str(root), dtype) def __enter__(self) -> "KnossosDataset": return self diff --git a/wkcuber/metadata.py b/wkcuber/metadata.py index c34f6cae0..e4ccb4314 100644 --- a/wkcuber/metadata.py +++ b/wkcuber/metadata.py @@ -16,14 +16,14 @@ from os.path import basename, normpath -def get_datasource_path(dataset_path: str) -> str: - return path.join(dataset_path, "datasource-properties.json") +def get_datasource_path(dataset_path: Path) -> Path: + return dataset_path / "datasource-properties.json" def create_parser() -> ArgumentParser: parser = ArgumentParser() - parser.add_argument("path", help="Directory containing the dataset.") + parser.add_argument("path", help="Directory containing the dataset.", type=Path) parser.add_argument("--name", "-n", help="Name of the dataset", default=None) @@ -45,19 +45,21 @@ def create_parser() -> ArgumentParser: return parser -def write_datasource_properties(dataset_path: str, datasource_properties: dict) -> None: +def write_datasource_properties( + dataset_path: Path, datasource_properties: dict +) -> None: datasource_properties_path = get_datasource_path(dataset_path) with open(datasource_properties_path, "wt") as datasource_properties_file: json.dump(datasource_properties, datasource_properties_file, indent=2) -def read_datasource_properties(dataset_path: str) -> dict: +def read_datasource_properties(dataset_path: Path) -> dict: with open(get_datasource_path(dataset_path), "r") as datasource_properties_file: return json.load(datasource_properties_file) def write_webknossos_metadata( - dataset_path: str, + dataset_path: Path, name: str, scale: Tuple[float, float, float], max_id: int = 0, @@ -69,7 +71,7 @@ def write_webknossos_metadata( for the given dataset path. Common layers are detected automatically. """ if name == None: - name = path.basename(dataset_path) + name = dataset_path.name # Generate a metadata file for webKnossos # Currently includes no source of information for team @@ -87,7 +89,7 @@ def write_webknossos_metadata( def refresh_metadata( - wkw_path: str, + wkw_path: Path, max_id: int = 0, compute_max_id: bool = False, exact_bounding_box: Optional[dict] = None, @@ -147,7 +149,7 @@ def convert_element_class_to_dtype(elementClass: str) -> np.dtype: def read_metadata_for_layer( - wkw_path: str, layer_name: str + wkw_path: Path, layer_name: str ) -> Tuple[dict, np.dtype, List[int], List[int]]: datasource_properties = read_datasource_properties(wkw_path) @@ -179,20 +181,22 @@ def convert_dtype_to_element_class(dtype: np.dtype) -> str: return conversion_map.get(dtype, str(dtype)) -def detect_mag_path(dataset_path: str, layer: str, mag: Mag = Mag(1)) -> Optional[str]: - layer_path = path.join(dataset_path, layer, str(mag)) - if path.exists(layer_path): +def detect_mag_path( + dataset_path: Path, layer: str, mag: Mag = Mag(1) +) -> Optional[Path]: + layer_path = dataset_path / layer / str(mag) + if layer_path.exists(): return layer_path - layer_path = path.join(dataset_path, layer, mag.to_long_layer_name()) - if path.exists(layer_path): + layer_path = dataset_path / layer / mag.to_long_layer_name() + if layer_path.exists(): return layer_path return None -def detect_dtype(dataset_path: str, layer: str, mag: Mag = Mag(1)) -> str: +def detect_dtype(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> str: layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is not None: - with wkw.Dataset.open(layer_path) as dataset: + with wkw.Dataset.open(str(layer_path)) as dataset: voxel_size = dataset.header.voxel_type num_channels = dataset.header.num_channels if voxel_size == np.uint8 and num_channels > 1: @@ -204,17 +208,17 @@ def detect_dtype(dataset_path: str, layer: str, mag: Mag = Mag(1)) -> str: ) -def detect_cubeLength(dataset_path: str, layer: str, mag: Mag = Mag(1)) -> int: +def detect_cubeLength(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> int: layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is not None: - with wkw.Dataset.open(layer_path) as dataset: + with wkw.Dataset.open(str(layer_path)) as dataset: return dataset.header.block_len * dataset.header.file_len raise RuntimeError( f"Failed to detect the cube length (for {dataset_path}, {layer}, {mag}) because the layer_path is None" ) -def detect_bbox(dataset_path: str, layer: str, mag: Mag = Mag(1)) -> Optional[dict]: +def detect_bbox(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> Optional[dict]: # Detect the coarse bounding box of a dataset by iterating # over the WKW cubes layer_path = detect_mag_path(dataset_path, layer, mag) @@ -234,7 +238,7 @@ def parse_cube_file_name(filename: str) -> Tuple[int, int, int]: def list_cubes(layer_path: str) -> Iterable[Tuple[int, int, int]]: return (parse_cube_file_name(f) for f in list_files(layer_path)) - xs, ys, zs = list(zip(*list_cubes(layer_path))) + xs, ys, zs = list(zip(*list_cubes(str(layer_path)))) min_x, min_y, min_z = min(xs), min(ys), min(zs) max_x, max_y, max_z = max(xs), max(ys), max(zs) @@ -251,7 +255,7 @@ def list_cubes(layer_path: str) -> Iterable[Tuple[int, int, int]]: } -def detect_resolutions(dataset_path: str, layer: str) -> Generator: +def detect_resolutions(dataset_path: Path, layer: str) -> Generator: for mag in listdir(path.join(dataset_path, layer)): try: yield Mag(mag) @@ -260,7 +264,7 @@ def detect_resolutions(dataset_path: str, layer: str) -> Generator: def detect_standard_layer( - dataset_path: str, + dataset_path: Path, layer_name: str, exact_bounding_box: Optional[dict] = None, category: str = "color", @@ -313,15 +317,15 @@ def detect_standard_layer( } -def detect_mappings(dataset_path: str, layer_name: str) -> List[str]: - pattern = path.join(dataset_path, layer_name, "mappings", "*.json") - mapping_files = glob.glob(pattern) +def detect_mappings(dataset_path: Path, layer_name: str) -> List[str]: + pattern = dataset_path / layer_name / "mappings" / "*.json" + mapping_files = glob.glob(str(pattern)) mapping_file_names = [path.basename(mapping_file) for mapping_file in mapping_files] return mapping_file_names def detect_segmentation_layer( - dataset_path: str, + dataset_path: Path, layer_name: str, max_id: int, compute_max_id: bool = False, @@ -337,7 +341,7 @@ def detect_segmentation_layer( logging.info("Computing max id of layer={}".format(layer_name)) # Computing the current largest segment id # This may take very long due to IO load - layer_path = detect_mag_path(dataset_path, layer_name, Mag(1)) + layer_path = str(detect_mag_path(dataset_path, layer_name, Mag(1))) with wkw.Dataset.open(layer_path) as dataset: bbox = layer_info["boundingBox"] layer_info["largestSegmentId"] = int( @@ -357,7 +361,7 @@ def detect_segmentation_layer( def detect_layers( - dataset_path: str, + dataset_path: Path, max_id: int, compute_max_id: bool, exact_bounding_box: Optional[dict] = None, diff --git a/wkcuber/recubing.py b/wkcuber/recubing.py index 1eb061c20..20efd4a0d 100644 --- a/wkcuber/recubing.py +++ b/wkcuber/recubing.py @@ -1,4 +1,5 @@ import logging +from pathlib import Path from typing import List, Tuple import wkw @@ -24,11 +25,11 @@ def create_parser() -> ArgumentParser: parser = ArgumentParser() parser.add_argument( - "source_path", help="Directory containing the datasource properties." + "source_path", help="Directory containing the datasource properties.", type=Path ) parser.add_argument( - "target_path", help="Output directory for the generated dataset." + "target_path", help="Output directory for the generated dataset.", type=Path ) parser.add_argument( @@ -73,8 +74,8 @@ def next_higher_divisible_by(number: int, divisor: int) -> int: def recube( - source_path: str, - target_path: str, + source_path: Path, + target_path: Path, layer_name: str, dtype: str, wkw_file_len: int = 32, diff --git a/wkcuber/tile_cubing.py b/wkcuber/tile_cubing.py index 71fa4e062..d747fcb09 100644 --- a/wkcuber/tile_cubing.py +++ b/wkcuber/tile_cubing.py @@ -1,5 +1,7 @@ import time import logging +from pathlib import Path + import numpy as np from typing import Dict, Tuple, Union, List, Optional import os @@ -98,7 +100,7 @@ def get_digit_counts_for_dimensions(pattern: str) -> Dict[str, int]: def detect_interval_for_dimensions( file_path_pattern: str, decimal_lengths: Dict[str, int] -) -> Tuple[Dict[str, int], Dict[str, int], Optional[str], int]: +) -> Tuple[Dict[str, int], Dict[str, int], Optional[Path], int]: arbitrary_file = None file_count = 0 # dictionary that maps the dimension string to the current dimension length @@ -120,7 +122,7 @@ def detect_interval_for_dimensions( found_files = glob(specific_pattern) file_count += len(found_files) for file_name in found_files: - arbitrary_file = file_name + arbitrary_file = Path(file_name) # Turn a pattern {xxx}/{yyy}/{zzzzzz} for given dimension counts into (e.g., 2, 2, 3) into # something like xx/yy/zzz (note that the curly braces are gone) applied_fpp = replace_pattern_to_specific_length_without_brackets( @@ -153,25 +155,29 @@ def find_file_with_dimensions( y_value: int, z_value: int, decimal_lengths: Dict[str, int], -) -> Union[str, None]: - file_path_unpadded = replace_coordinates( - file_path_pattern, {"z": (z_value, 0), "y": (y_value, 0), "x": (x_value, 0)} +) -> Union[Path, None]: + file_path_unpadded = Path( + replace_coordinates( + file_path_pattern, {"z": (z_value, 0), "y": (y_value, 0), "x": (x_value, 0)} + ) ) - file_path_padded = replace_coordinates( - file_path_pattern, - { - "z": (z_value, decimal_lengths["z"]), - "y": (y_value, decimal_lengths["y"]), - "x": (x_value, decimal_lengths["x"]), - }, + file_path_padded = Path( + replace_coordinates( + file_path_pattern, + { + "z": (z_value, decimal_lengths["z"]), + "y": (y_value, decimal_lengths["y"]), + "x": (x_value, decimal_lengths["x"]), + }, + ) ) # the unpadded file pattern has a higher precedence - if os.path.isfile(file_path_unpadded): + if file_path_unpadded.is_file(): return file_path_unpadded - if os.path.isfile(file_path_padded): + if file_path_padded.is_file(): return file_path_padded return None @@ -261,7 +267,7 @@ def tile_cubing_job( def tile_cubing( - target_path: str, + target_path: Path, layer_name: str, batch_size: int, input_path_pattern: str, diff --git a/wkcuber/upsampling.py b/wkcuber/upsampling.py index b04e606e1..a4f382eb2 100644 --- a/wkcuber/upsampling.py +++ b/wkcuber/upsampling.py @@ -1,5 +1,6 @@ from argparse import ArgumentParser, Namespace import os +from pathlib import Path from wkcuber.api.Dataset import WKDataset from .mag import Mag @@ -16,7 +17,7 @@ def create_parser() -> ArgumentParser: parser = ArgumentParser() - parser.add_argument("path", help="Directory containing the dataset.") + parser.add_argument("path", help="Directory containing the dataset.", type=Path) parser.add_argument( "--layer_name", @@ -80,7 +81,7 @@ def create_parser() -> ArgumentParser: def upsample_mags( - path: str, + path: Path, layer_name: str = None, from_mag: Mag = None, target_mag: Mag = Mag(1), @@ -100,7 +101,7 @@ def upsample_mags( if not layer_name or not from_mag: layer_name = os.path.basename(os.path.dirname(path)) from_mag = Mag(os.path.basename(path)) - path = os.path.dirname(os.path.dirname(path)) + path = path.parent.parent WKDataset(path).get_layer(layer_name).upsample( from_mag=from_mag, diff --git a/wkcuber/utils.py b/wkcuber/utils.py index e5061ebe4..f4c044a58 100644 --- a/wkcuber/utils.py +++ b/wkcuber/utils.py @@ -2,6 +2,7 @@ import re import time from concurrent.futures._base import Future +from pathlib import Path from types import TracebackType import logging @@ -55,7 +56,7 @@ def open_wkw(info: WkwDatasetInfo) -> wkw.Dataset: ds = wkw.Dataset.open( - path.join(info.dataset_path, info.layer_name, str(info.mag)), info.header + str(info.dataset_path / info.layer_name / str(info.mag)), info.header ) return ds @@ -270,7 +271,7 @@ def wait_and_ensure_success(futures: List[Future]) -> None: class BufferedSliceWriter(object): def __init__( self, - dataset_path: str, + dataset_path: Path, layer_name: str, dtype: np.dtype, origin: Union[Tuple[int, int, int], List[int]],