From 2a31250989f921fbc33660158739f6e7b025b347 Mon Sep 17 00:00:00 2001 From: rly Date: Tue, 14 May 2024 08:31:49 -0700 Subject: [PATCH] Simplify method for adding all chunk info --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 103 +++++---------------- lindi/LindiH5ZarrStore/_util.py | 54 +++++------ 2 files changed, 52 insertions(+), 105 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 7c3a65f..4814c86 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -8,12 +8,10 @@ from tqdm import tqdm from ._util import ( _read_bytes, - _get_all_chunk_info, - _get_chunk_index, + _apply_to_all_chunk_info, _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, _join, - _get_chunk_names_for_dataset, _write_rfs_to_file, ) from ..conversion.attr_conversion import h5_to_zarr_attr @@ -425,13 +423,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) return byte_offset, byte_count, None - def _add_chunk_info_to_refs( - self, - key_parent: str, - key_names: List[str], - add_ref: Callable, - add_ref_chunk: Callable - ): + def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_chunk: Callable): if self._h5f is None: raise Exception("Store is closed") h5_item = self._h5f.get('/' + key_parent, None) @@ -443,19 +435,12 @@ def _add_chunk_info_to_refs( raise Exception( f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}" ) - if len(key_names) != 1 or key_names[0] != "0": - raise Exception( - f"Chunk name {key_names[0]} must be '0' for scalar dataset {key_parent}" - ) inline_array = self._get_inline_array(key_parent, h5_item) if inline_array.is_inline: - if len(key_names) != 1 or key_names[0] != inline_array.chunk_fname: - raise Exception( - f"Chunk name {key_names[0]} does not match dataset dimensions for inline array {key_parent}" - ) + key_name = inline_array.chunk_fname inline_data = inline_array.chunk_bytes - add_ref(f"{key_parent}/{key_names[0]}", inline_data) + add_ref(f"{key_parent}/{key_name}", inline_data) return # If this is a scalar, then the data should have been inline @@ -463,62 +448,34 @@ def _add_chunk_info_to_refs( raise Exception(f"No inline data for scalar dataset {key_parent}") if h5_item.chunks is not None: - # Get the byte range in the file for each chunk. - # Get a list of all the chunk info. - chunk_info = _get_all_chunk_info(h5_item) - for chunk_index, key_name in tqdm( - enumerate(key_names), - total=len(key_names), + # Set up progress bar for manual updates because h5py chunk_iter used in _apply_to_all_chunk_info + # does not provide a way to hook in a progress bar + dsid = h5_item.id + num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks + pbar = tqdm( + total=num_chunks, desc=f"Writing chunk info for {key_parent}", leave=True, delay=2 # do not show progress bar until 2 seconds have passed - ): - chunk_coords = None # so that chunk_coords is not unbound on exception - try: - # TODO remove this code through the assert after verifying order of key_names - # Get the chunk coords from the file name - chunk_name_parts = key_name.split(".") - if len(chunk_name_parts) != h5_item.ndim: - raise Exception(f"Chunk name {key_name} does not match dataset dimensions") - chunk_coords = tuple(int(x) for x in chunk_name_parts) - for i, c in enumerate(chunk_coords): - if c < 0 or c >= h5_item.shape[i]: - raise Exception( - f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}" - ) - assert chunk_index == _get_chunk_index(h5_item, chunk_coords) - - # use chunk_info if available on this system because it is more efficient, - # otherwise use the slower _get_chunk_byte_range - if chunk_info is not None: - byte_offset = chunk_info[chunk_index].byte_offset - byte_count = chunk_info[chunk_index].size - else: - byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) - except Exception as e: - raise Exception( - f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" - ) + ) - # In this case we reference a chunk of data in a separate file + chunk_size = h5_item.chunks + def store_chunk_info(chunk_info): + # Get the byte range in the file for each chunk. + chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset + byte_offset = chunk_info.byte_offset + byte_count = chunk_info.size + key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)]) add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + pbar.update() + + _apply_to_all_chunk_info(h5_item, store_chunk_info) + pbar.close() else: - # In this case (contiguous dataset), we need to check that the chunk - # coordinates are (0, 0, 0, ...) - if len(key_names) != 1: - raise Exception( - f"Contiguous dataset {key_parent} must have exactly one key name, but got {key_names}" - ) - key_name = key_names[0] - chunk_coords = tuple(int(x) for x in key_name.split(".")) - if chunk_coords != (0,) * h5_item.ndim: - raise Exception( - f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" - ) # Get the byte range in the file for the contiguous dataset byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) + key_name = ".".join("0" for _ in range(h5_item.ndim)) add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) - return byte_offset, byte_count, None def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset): # First check the memory cache @@ -667,19 +624,7 @@ def _process_dataset(key): if external_array_link is None: # Only add chunk references for datasets without an external array link - shape = zarray_dict["shape"] - chunks = zarray_dict.get("chunks", None) - chunk_coords_shape = [ - # the shape could be zero -- for example dandiset 000559 - acquisition/depth_video/data has shape [0, 0, 0] - (shape[i] + chunks[i] - 1) // chunks[i] if chunks[i] != 0 else 0 - for i in range(len(shape)) - ] - # For example, chunk_names could be ['0', '1', '2', ...] - # or ['0.0', '0.1', '0.2', ...] - chunk_names = _get_chunk_names_for_dataset( - chunk_coords_shape - ) - self._add_chunk_info_to_refs(key, chunk_names, _add_ref, _add_ref_chunk) + self._add_chunk_info_to_refs(key, _add_ref, _add_ref_chunk) # Process the groups recursively starting with the root group _process_group("", self._h5f) diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index b310084..719a3cf 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -1,7 +1,8 @@ -from typing import IO, List, Union +from typing import IO, List, Callable import json import numpy as np import h5py +import warnings def _read_bytes(file: IO, offset: int, count: int): @@ -10,33 +11,43 @@ def _read_bytes(file: IO, offset: int, count: int): return file.read(count) -def _get_all_chunk_info(h5_dataset: h5py.Dataset) -> Union[list, None]: - """Get the chunk info for all the chunks of an h5py dataset as a list of StoreInfo objects. - The chunks are in order such that the last dimension changes the fastest, e.g., chunk coordinates could be: +def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable): + """Apply the callback function to each chunk of an h5py dataset. + The chunks are iterated in order such that the last dimension changes the fastest, + e.g., chunk coordinates could be: [0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ... - Use stinfo[i].byte_offset and stinfo[i].size to get the byte range in the file for the i-th chunk. + This method tries to use the `chunk_iter` method if it is available. The `chunk_iter` method requires + HDF5 1.12.3 and above. If it is not available, this method falls back to the `get_chunk_info` method, + which is significantly slower and not recommended if the dataset has many chunks. - Requires HDF5 1.12.3 and above. If the chunk_iter method is not available, return None. + `chunk_iter` takes 1-5 seconds for all chunks for a dataset with 1e6 chunks. + `get_chunk_info` takes about 0.2 seconds per chunk for a dataset with 1e6 chunks. - This takes 1-5 seconds for a dataset with 1e6 chunks. - - This might be very slow if the dataset is stored remotely. + NOTE: This method might be very slow if the dataset is stored remotely. """ - stinfo = list() + assert h5_dataset.chunks is not None dsid = h5_dataset.id try: - dsid.chunk_iter(stinfo.append) + dsid.chunk_iter(callback) except AttributeError: # chunk_iter is not available - return None - return stinfo + num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks + if num_chunks > 100: + warnings.warn( + f"Dataset {h5_dataset.name} has {num_chunks} chunks. Using get_chunk_info is slow. " + f"Consider upgrading to HDF5 1.12.3 or above for faster performance." + ) + for index in range(num_chunks): + chunk_info = dsid.get_chunk_info(index) + callback(chunk_info) -def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int: - """Get the chunk index for a chunk of an h5py dataset. +def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: + """Get the byte range in the file for a chunk of an h5py dataset. - This involves some low-level functions from the h5py library. + This involves some low-level functions from the h5py library. First we need + to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. """ shape = h5_dataset.shape chunk_shape = h5_dataset.chunks @@ -52,16 +63,6 @@ def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int: chunk_index = 0 for i in range(ndim): chunk_index += int(chunk_coords[i] * np.prod(chunk_coords_shape[i + 1:])) - return chunk_index - - -def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple: - """Get the byte range in the file for a chunk of an h5py dataset. - - This involves some low-level functions from the h5py library. First we need - to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index. - """ - chunk_index = _get_chunk_index(h5_dataset, chunk_coords) return _get_chunk_byte_range_for_chunk_index(h5_dataset, chunk_index) @@ -99,6 +100,7 @@ def _join(a: str, b: str) -> str: return f"{a}/{b}" +# NOTE: this is no longer used def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]: """Get the chunk names for a dataset with the given chunk coords shape.