Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

single file binary lindi #88

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linter_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ jobs:
- name: Run flake8
run: cd lindi && flake8 --config ../.flake8
- name: Run pyright
run: cd lindi && pyright
run: cd lindi && pyright .
2 changes: 1 addition & 1 deletion .vscode/tasks/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -ex

cd lindi
flake8 .
pyright
pyright .
cd ..

pytest --cov=lindi --cov-report=xml --cov-report=term tests/
20 changes: 20 additions & 0 deletions examples/write_lindi_binary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import lindi


def write_lindi_binary():
with lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='w', create_binary=True) as f:
f.attrs['test'] = 42
ds = f.create_dataset('data', shape=(1000, 1000), dtype='f4')
ds[...] = 42


def test_read():
f = lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='r')
print(f.attrs['test'])
print(f['data'][0, 0])
f.close()


if __name__ == "__main__":
write_lindi_binary()
test_read()
185 changes: 168 additions & 17 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,109 @@
from ..LocalCache.LocalCache import ChunkTooLargeError, LocalCache
from ..LindiRemfile.LindiRemfile import LindiRemfile
from .LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts
from ..LindiH5pyFile.LindiReferenceFileSystemStore import _get_padded_size, _pad_chunk


class SplitDatasetH5Item:
"""
Represents a dataset that is a single contiguous chunk in the hdf5 file, but
is split into multiple chunks for efficient slicing in the zarr store.
"""
def __init__(self, h5_item, *, contiguous_dataset_max_chunk_size: Union[int, None]):
self._h5_item = h5_item
self._contiguous_dataset_max_chunk_size = contiguous_dataset_max_chunk_size
should_split = False
if contiguous_dataset_max_chunk_size is not None:
codecs = h5_filters_to_codecs(h5_item)
if codecs is None or len(codecs) == 0: # does not have compression
if h5_item.chunks is None or h5_item.chunks == h5_item.shape: # only one chunk
if h5_item.dtype.kind in ['i', 'u', 'f']: # integer or float
size_bytes = int(np.prod(h5_item.shape)) * h5_item.dtype.itemsize
if size_bytes > contiguous_dataset_max_chunk_size: # large enough to split
should_split = True
self._do_split = should_split
if should_split:
size0 = int(np.prod(h5_item.shape[1:])) * h5_item.dtype.itemsize
# We want each chunk to be of size around
# contiguous_dataset_max_chunk_size. So if nn is the size of a chunk
# in the first dimension, then nn * size0 should be approximately
# contiguous_dataset_max_chunk_size. So nn should be approximately
# contiguous_dataset_max_chunk_size // size0
nn = contiguous_dataset_max_chunk_size // size0
if nn == 0:
# The chunk size should not be zero
nn = 1
self._split_chunk_shape = (nn,) + h5_item.shape[1:]
if h5_item.chunks is not None:
zero_chunk_coords = (0,) * h5_item.ndim
try:
byte_offset, byte_count = _get_chunk_byte_range(h5_item, zero_chunk_coords)
except Exception as e:
raise Exception(
f"Error getting byte range for chunk when trying to split contiguous dataset {h5_item.name}: {e}"
)
else:
# Get the byte range in the file for the contiguous dataset
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
self._split_chunk_byte_offset = byte_offset
self._split_chunk_byte_count = byte_count
self._num_chunks = int(np.prod(h5_item.shape[0:]) + np.prod(self._split_chunk_shape) - 1) // int(np.prod(self._split_chunk_shape))
else:
self._split_chunk_shape = None
self._split_chunk_byte_offset = None
self._split_chunk_byte_count = None
self._num_chunks = None

def get_chunk_byte_range(self, chunk_coords: Tuple[int, ...]):
if len(chunk_coords) != self.ndim:
raise Exception(f"SplitDatasetH5Item: Chunk coordinates {chunk_coords} do not match dataset dimensions")
for i in range(1, len(chunk_coords)):
if chunk_coords[i] != 0:
raise Exception(f"SplitDatasetH5Item: Unexpected non-zero chunk coordinate {chunk_coords[i]}")
if self._split_chunk_byte_offset is None:
raise Exception("SplitDatasetH5Item: Unexpected _split_chunk_byte_offset is None")
if self._split_chunk_shape is None:
raise Exception("SplitDatasetH5Item: Unexpected _split_chunk_shape is None")
chunk_index = chunk_coords[0]
byte_offset = self._split_chunk_byte_offset + chunk_index * int(np.prod(self._split_chunk_shape)) * self.dtype.itemsize
byte_count = int(np.prod(self._split_chunk_shape)) * self.dtype.itemsize
if byte_offset + byte_count > self._split_chunk_byte_offset + self._split_chunk_byte_count:
byte_count = self._split_chunk_byte_offset + self._split_chunk_byte_count - byte_offset
return byte_offset, byte_count

@property
def shape(self):
return self._h5_item.shape

@property
def dtype(self):
return self._h5_item.dtype

@property
def name(self):
return self._h5_item.name

@property
def chunks(self):
if self._do_split:
return self._split_chunk_shape
return self._h5_item.chunks

@property
def ndim(self):
return self._h5_item.ndim

@property
def fillvalue(self):
return self._h5_item.fillvalue

@property
def attrs(self):
return self._h5_item.attrs

@property
def size(self):
return self._h5_item.size


class LindiH5ZarrStore(Store):
Expand Down Expand Up @@ -65,6 +168,9 @@ def __init__(
# it when the chunk is requested.
self._inline_arrays: Dict[str, InlineArray] = {}

# For large contiguous arrays, we want to split them into smaller chunks.
self._split_datasets: Dict[str, SplitDatasetH5Item] = {}

self._external_array_links: Dict[str, Union[dict, None]] = {}

@staticmethod
Expand Down Expand Up @@ -118,6 +224,16 @@ def close(self):
self._file = None

def __getitem__(self, key):
val = self._get_helper(key)

if val is not None:
padded_size = _get_padded_size(self, key, val)
if padded_size is not None:
val = _pad_chunk(val, padded_size)

return val

def _get_helper(self, key: str):
"""Get an item from the store (required by base class)."""
parts = [part for part in key.split("/") if part]
if len(parts) == 0:
Expand Down Expand Up @@ -180,6 +296,8 @@ def __contains__(self, key):
return False
if not isinstance(h5_item, h5py.Dataset):
return False
if self._split_datasets.get(key_parent, None) is not None:
h5_item = self._split_datasets[key_parent]
external_array_link = self._get_external_array_link(key_parent, h5_item)
if external_array_link is not None:
# The chunk files do not exist for external array links
Expand Down Expand Up @@ -278,7 +396,7 @@ def _get_zgroup_bytes(self, parent_key: str):
zarr.group(store=memory_store)
return reformat_json(memory_store.get(".zgroup"))

def _get_inline_array(self, key: str, h5_dataset: h5py.Dataset):
def _get_inline_array(self, key: str, h5_dataset: Union[h5py.Dataset, SplitDatasetH5Item]):
if key in self._inline_arrays:
return self._inline_arrays[key]
self._inline_arrays[key] = InlineArray(h5_dataset)
Expand All @@ -299,6 +417,11 @@ def _get_zarray_bytes(self, parent_key: str):

filters = h5_filters_to_codecs(h5_item)

split_dataset = SplitDatasetH5Item(h5_item, contiguous_dataset_max_chunk_size=self._opts.contiguous_dataset_max_chunk_size)
if split_dataset._do_split:
self._split_datasets[parent_key] = split_dataset
h5_item = split_dataset

# We create a dummy zarr dataset with the appropriate shape, chunks,
# dtype, and filters and then copy the .zarray JSON text from it
memory_store = MemoryStore()
Expand Down Expand Up @@ -370,6 +493,9 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
if not isinstance(h5_item, h5py.Dataset):
raise Exception(f"Item {key_parent} is not a dataset")

if self._split_datasets.get(key_parent, None) is not None:
h5_item = self._split_datasets[key_parent]

external_array_link = self._get_external_array_link(key_parent, h5_item)
if external_array_link is not None:
raise Exception(
Expand Down Expand Up @@ -418,7 +544,10 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
if h5_item.chunks is not None:
# Get the byte range in the file for the chunk.
try:
byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords)
if isinstance(h5_item, SplitDatasetH5Item):
byte_offset, byte_count = h5_item.get_chunk_byte_range(chunk_coords)
else:
byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords)
except Exception as e:
raise Exception(
f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}"
Expand All @@ -430,6 +559,8 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str):
raise Exception(
f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}"
)
if isinstance(h5_item, SplitDatasetH5Item):
raise Exception(f'Unexpected SplitDatasetH5Item for contiguous dataset {key_parent}')
# Get the byte range in the file for the contiguous dataset
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
return byte_offset, byte_count, None
Expand All @@ -440,6 +571,9 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch
h5_item = self._h5f.get('/' + key_parent, None)
assert isinstance(h5_item, h5py.Dataset)

if self._split_datasets.get(key_parent, None) is not None:
h5_item = self._split_datasets[key_parent]

# If the shape is (0,), (0, 0), (0, 0, 0), etc., then do not add any chunk references
if np.prod(h5_item.shape) == 0:
return
Expand Down Expand Up @@ -467,7 +601,7 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch
# does not provide a way to hook in a progress bar
# We use max number of chunks instead of actual number of chunks because get_num_chunks is slow
# for remote datasets.
num_chunks = _get_max_num_chunks(h5_item) # NOTE: unallocated chunks are counted
num_chunks = _get_max_num_chunks(shape=h5_item.shape, chunk_size=h5_item.chunks) # NOTE: unallocated chunks are counted
pbar = tqdm(
total=num_chunks,
desc=f"Writing chunk info for {key_parent}",
Expand All @@ -477,24 +611,35 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch

chunk_size = h5_item.chunks

def store_chunk_info(chunk_info):
# Get the byte range in the file for each chunk.
chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset
byte_offset = chunk_info.byte_offset
byte_count = chunk_info.size
key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)])
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))
pbar.update()
if isinstance(h5_item, SplitDatasetH5Item):
assert h5_item._num_chunks is not None, "Unexpected: _num_chunks is None"
for i in range(h5_item._num_chunks):
chunk_coords = (i,) + (0,) * (h5_item.ndim - 1)
byte_offset, byte_count = h5_item.get_chunk_byte_range(chunk_coords)
key_name = ".".join([str(x) for x in chunk_coords])
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))
pbar.update()
else:
def store_chunk_info(chunk_info):
# Get the byte range in the file for each chunk.
chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset
byte_offset = chunk_info.byte_offset
byte_count = chunk_info.size
key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)])
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))
pbar.update()

_apply_to_all_chunk_info(h5_item, store_chunk_info)

_apply_to_all_chunk_info(h5_item, store_chunk_info)
pbar.close()
else:
# Get the byte range in the file for the contiguous dataset
assert not isinstance(h5_item, SplitDatasetH5Item), "Unexpected SplitDatasetH5Item for contiguous dataset"
byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item)
key_name = ".".join("0" for _ in range(h5_item.ndim))
add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count))

def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset):
def _get_external_array_link(self, parent_key: str, h5_item: Union[h5py.Dataset, SplitDatasetH5Item]):
# First check the memory cache
if parent_key in self._external_array_links:
return self._external_array_links[parent_key]
Expand All @@ -510,7 +655,7 @@ def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset):
(shape[i] + chunks[i] - 1) // chunks[i] if chunks[i] != 0 else 0
for i in range(len(shape))
]
num_chunks = np.prod(chunk_coords_shape)
num_chunks = int(np.prod(chunk_coords_shape))
if num_chunks > self._opts.num_dataset_chunks_threshold:
if self._url is not None:
self._external_array_links[parent_key] = {
Expand Down Expand Up @@ -663,7 +808,7 @@ def _process_dataset(key, item: h5py.Dataset):


class InlineArray:
def __init__(self, h5_dataset: h5py.Dataset):
def __init__(self, h5_dataset: Union[h5py.Dataset, SplitDatasetH5Item]):
self._additional_zarr_attributes = {}
if h5_dataset.shape == ():
self._additional_zarr_attributes["_SCALAR"] = True
Expand All @@ -686,9 +831,15 @@ def __init__(self, h5_dataset: h5py.Dataset):
# For example: [['x', 'uint32'], ['y', 'uint32'], ['weight', 'float32']]
self._additional_zarr_attributes["_COMPOUND_DTYPE"] = compound_dtype
if self._is_inline:
if isinstance(h5_dataset, SplitDatasetH5Item):
raise Exception('SplitDatasetH5Item should not be an inline array')
memory_store = MemoryStore()
dummy_group = zarr.group(store=memory_store)
size_is_zero = np.prod(h5_dataset.shape) == 0
if isinstance(h5_dataset, SplitDatasetH5Item):
h5_item = h5_dataset._h5_item
else:
h5_item = h5_dataset
create_zarr_dataset_from_h5_data(
zarr_parent_group=dummy_group,
name='X',
Expand All @@ -700,8 +851,8 @@ def __init__(self, h5_dataset: h5py.Dataset):
label=f'{h5_dataset.name}',
h5_shape=h5_dataset.shape,
h5_dtype=h5_dataset.dtype,
h5f=h5_dataset.file,
h5_data=h5_dataset[...]
h5f=h5_item.file,
h5_data=h5_item[...]
)
self._zarray_bytes = reformat_json(memory_store['X/.zarray'])
if not size_is_zero:
Expand Down
7 changes: 7 additions & 0 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStoreOpts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,12 @@ class LindiH5ZarrStoreOpts:
the dataset will be represented as an external array link. If None, then
no datasets will be represented as external array links (equivalent to a
threshold of 0). Default is 1000.

contiguous_dataset_max_chunk_size (Union[int, None]): For large
contiguous arrays in the hdf5 file that are not chunked, this option
specifies the maximum size in bytes of the zarr chunks that will be
created. If None, then the entire array will be represented as a single
chunk. Default is 1000 * 1000 * 20
"""
num_dataset_chunks_threshold: Union[int, None] = 1000
contiguous_dataset_max_chunk_size: Union[int, None] = 1000 * 1000 * 20
7 changes: 4 additions & 3 deletions lindi/LindiH5ZarrStore/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ def _read_bytes(file: IO, offset: int, count: int):
return file.read(count)


def _get_max_num_chunks(h5_dataset: h5py.Dataset):
def _get_max_num_chunks(*, shape, chunk_size):
"""Get the maximum number of chunks in an h5py dataset.

This is similar to h5_dataset.id.get_num_chunks() but significantly faster. It does not account for
whether some chunks are allocated.
"""
chunk_size = h5_dataset.chunks
assert chunk_size is not None
return math.prod([math.ceil(a / b) for a, b in zip(h5_dataset.shape, chunk_size)])
if np.prod(chunk_size) == 0:
return 0
return math.prod([math.ceil(a / b) for a, b in zip(shape, chunk_size)])


def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable):
Expand Down
Loading