Skip to content

Commit

Permalink
Simplify method for adding all chunk info
Browse files Browse the repository at this point in the history
  • Loading branch information
rly committed May 14, 2024
1 parent 3652ebb commit 2a31250
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 105 deletions.
103 changes: 24 additions & 79 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
from tqdm import tqdm
from ._util import (
_read_bytes,
_get_all_chunk_info,
_get_chunk_index,
_apply_to_all_chunk_info,
_get_chunk_byte_range,
_get_byte_range_for_contiguous_dataset,
_join,
_get_chunk_names_for_dataset,
_write_rfs_to_file,
)
from ..conversion.attr_conversion import h5_to_zarr_attr
Expand Down Expand Up @@ -425,13 +423,7 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
return byte_offset, byte_count, None

def _add_chunk_info_to_refs(
self,
key_parent: str,
key_names: List[str],
add_ref: Callable,
add_ref_chunk: Callable
):
def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_chunk: Callable):
if self._h5f is None:
raise Exception("Store is closed")
h5_item = self._h5f.get('/' + key_parent, None)
Expand All @@ -443,82 +435,47 @@ def _add_chunk_info_to_refs(
raise Exception(
f"Unable to handle case where chunks is not None but ndim is 0 for dataset {key_parent}"
)
if len(key_names) != 1 or key_names[0] != "0":
raise Exception(
f"Chunk name {key_names[0]} must be '0' for scalar dataset {key_parent}"
)

inline_array = self._get_inline_array(key_parent, h5_item)
if inline_array.is_inline:
if len(key_names) != 1 or key_names[0] != inline_array.chunk_fname:
raise Exception(
f"Chunk name {key_names[0]} does not match dataset dimensions for inline array {key_parent}"
)
key_name = inline_array.chunk_fname
inline_data = inline_array.chunk_bytes
add_ref(f"{key_parent}/{key_names[0]}", inline_data)
add_ref(f"{key_parent}/{key_name}", inline_data)
return

# If this is a scalar, then the data should have been inline
if h5_item.ndim == 0:
raise Exception(f"No inline data for scalar dataset {key_parent}")

if h5_item.chunks is not None:
# Get the byte range in the file for each chunk.
# Get a list of all the chunk info.
chunk_info = _get_all_chunk_info(h5_item)
for chunk_index, key_name in tqdm(
enumerate(key_names),
total=len(key_names),
# Set up progress bar for manual updates because h5py chunk_iter used in _apply_to_all_chunk_info
# does not provide a way to hook in a progress bar
dsid = h5_item.id
num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks
pbar = tqdm(
total=num_chunks,
desc=f"Writing chunk info for {key_parent}",
leave=True,
delay=2 # do not show progress bar until 2 seconds have passed
):
chunk_coords = None # so that chunk_coords is not unbound on exception
try:
# TODO remove this code through the assert after verifying order of key_names
# Get the chunk coords from the file name
chunk_name_parts = key_name.split(".")
if len(chunk_name_parts) != h5_item.ndim:
raise Exception(f"Chunk name {key_name} does not match dataset dimensions")
chunk_coords = tuple(int(x) for x in chunk_name_parts)
for i, c in enumerate(chunk_coords):
if c < 0 or c >= h5_item.shape[i]:
raise Exception(
f"Chunk coordinates {chunk_coords} out of range for dataset {key_parent} with dtype {h5_item.dtype}"
)
assert chunk_index == _get_chunk_index(h5_item, chunk_coords)

# use chunk_info if available on this system because it is more efficient,
# otherwise use the slower _get_chunk_byte_range
if chunk_info is not None:
byte_offset = chunk_info[chunk_index].byte_offset
byte_count = chunk_info[chunk_index].size
else:
byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords)
except Exception as e:
raise Exception(
f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}"
)
)

# In this case we reference a chunk of data in a separate file
chunk_size = h5_item.chunks
def store_chunk_info(chunk_info):
# Get the byte range in the file for each chunk.
chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset
byte_offset = chunk_info.byte_offset
byte_count = chunk_info.size
key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)])
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))
pbar.update()

_apply_to_all_chunk_info(h5_item, store_chunk_info)
pbar.close()
else:
# In this case (contiguous dataset), we need to check that the chunk
# coordinates are (0, 0, 0, ...)
if len(key_names) != 1:
raise Exception(
f"Contiguous dataset {key_parent} must have exactly one key name, but got {key_names}"
)
key_name = key_names[0]
chunk_coords = tuple(int(x) for x in key_name.split("."))
if chunk_coords != (0,) * h5_item.ndim:
raise Exception(
f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}"
)
# Get the byte range in the file for the contiguous dataset
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
key_name = ".".join("0" for _ in range(h5_item.ndim))
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))
return byte_offset, byte_count, None

def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset):
# First check the memory cache
Expand Down Expand Up @@ -667,19 +624,7 @@ def _process_dataset(key):

if external_array_link is None:
# Only add chunk references for datasets without an external array link
shape = zarray_dict["shape"]
chunks = zarray_dict.get("chunks", None)
chunk_coords_shape = [
# the shape could be zero -- for example dandiset 000559 - acquisition/depth_video/data has shape [0, 0, 0]
(shape[i] + chunks[i] - 1) // chunks[i] if chunks[i] != 0 else 0
for i in range(len(shape))
]
# For example, chunk_names could be ['0', '1', '2', ...]
# or ['0.0', '0.1', '0.2', ...]
chunk_names = _get_chunk_names_for_dataset(
chunk_coords_shape
)
self._add_chunk_info_to_refs(key, chunk_names, _add_ref, _add_ref_chunk)
self._add_chunk_info_to_refs(key, _add_ref, _add_ref_chunk)

# Process the groups recursively starting with the root group
_process_group("", self._h5f)
Expand Down
54 changes: 28 additions & 26 deletions lindi/LindiH5ZarrStore/_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import IO, List, Union
from typing import IO, List, Callable
import json
import numpy as np
import h5py
import warnings


def _read_bytes(file: IO, offset: int, count: int):
Expand All @@ -10,33 +11,43 @@ def _read_bytes(file: IO, offset: int, count: int):
return file.read(count)


def _get_all_chunk_info(h5_dataset: h5py.Dataset) -> Union[list, None]:
"""Get the chunk info for all the chunks of an h5py dataset as a list of StoreInfo objects.
The chunks are in order such that the last dimension changes the fastest, e.g., chunk coordinates could be:
def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable):
"""Apply the callback function to each chunk of an h5py dataset.
The chunks are iterated in order such that the last dimension changes the fastest,
e.g., chunk coordinates could be:
[0, 0, 0], [0, 0, 1], [0, 0, 2], ..., [0, 1, 0], [0, 1, 1], [0, 1, 2], ..., [1, 0, 0], [1, 0, 1], [1, 0, 2], ...
Use stinfo[i].byte_offset and stinfo[i].size to get the byte range in the file for the i-th chunk.
This method tries to use the `chunk_iter` method if it is available. The `chunk_iter` method requires
HDF5 1.12.3 and above. If it is not available, this method falls back to the `get_chunk_info` method,
which is significantly slower and not recommended if the dataset has many chunks.
Requires HDF5 1.12.3 and above. If the chunk_iter method is not available, return None.
`chunk_iter` takes 1-5 seconds for all chunks for a dataset with 1e6 chunks.
`get_chunk_info` takes about 0.2 seconds per chunk for a dataset with 1e6 chunks.
This takes 1-5 seconds for a dataset with 1e6 chunks.
This might be very slow if the dataset is stored remotely.
NOTE: This method might be very slow if the dataset is stored remotely.
"""
stinfo = list()
assert h5_dataset.chunks is not None
dsid = h5_dataset.id
try:
dsid.chunk_iter(stinfo.append)
dsid.chunk_iter(callback)
except AttributeError:
# chunk_iter is not available
return None
return stinfo
num_chunks = dsid.get_num_chunks() # NOTE: this is very slow if dataset is remote and has many chunks
if num_chunks > 100:
warnings.warn(
f"Dataset {h5_dataset.name} has {num_chunks} chunks. Using get_chunk_info is slow. "
f"Consider upgrading to HDF5 1.12.3 or above for faster performance."
)
for index in range(num_chunks):
chunk_info = dsid.get_chunk_info(index)
callback(chunk_info)


def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int:
"""Get the chunk index for a chunk of an h5py dataset.
def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple:
"""Get the byte range in the file for a chunk of an h5py dataset.
This involves some low-level functions from the h5py library.
This involves some low-level functions from the h5py library. First we need
to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index.
"""
shape = h5_dataset.shape
chunk_shape = h5_dataset.chunks
Expand All @@ -52,16 +63,6 @@ def _get_chunk_index(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> int:
chunk_index = 0
for i in range(ndim):
chunk_index += int(chunk_coords[i] * np.prod(chunk_coords_shape[i + 1:]))
return chunk_index


def _get_chunk_byte_range(h5_dataset: h5py.Dataset, chunk_coords: tuple) -> tuple:
"""Get the byte range in the file for a chunk of an h5py dataset.
This involves some low-level functions from the h5py library. First we need
to get the chunk index. Then we call _get_chunk_byte_range_for_chunk_index.
"""
chunk_index = _get_chunk_index(h5_dataset, chunk_coords)
return _get_chunk_byte_range_for_chunk_index(h5_dataset, chunk_index)


Expand Down Expand Up @@ -99,6 +100,7 @@ def _join(a: str, b: str) -> str:
return f"{a}/{b}"


# NOTE: this is no longer used
def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]:
"""Get the chunk names for a dataset with the given chunk coords shape.
Expand Down

0 comments on commit 2a31250

Please sign in to comment.