diff --git a/.github/workflows/linter_checks.yml b/.github/workflows/linter_checks.yml index acf2726..87ffa72 100644 --- a/.github/workflows/linter_checks.yml +++ b/.github/workflows/linter_checks.yml @@ -19,4 +19,4 @@ jobs: - name: Run flake8 run: cd lindi && flake8 --config ../.flake8 - name: Run pyright - run: cd lindi && pyright + run: cd lindi && pyright . diff --git a/.vscode/tasks/test.sh b/.vscode/tasks/test.sh index c87184e..3d01bfa 100755 --- a/.vscode/tasks/test.sh +++ b/.vscode/tasks/test.sh @@ -5,7 +5,7 @@ set -ex cd lindi flake8 . -pyright +pyright . cd .. pytest --cov=lindi --cov-report=xml --cov-report=term tests/ diff --git a/examples/write_lindi_binary.py b/examples/write_lindi_binary.py new file mode 100644 index 0000000..cdb518b --- /dev/null +++ b/examples/write_lindi_binary.py @@ -0,0 +1,20 @@ +import lindi + + +def write_lindi_binary(): + with lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='w', create_binary=True) as f: + f.attrs['test'] = 42 + ds = f.create_dataset('data', shape=(1000, 1000), dtype='f4') + ds[...] = 42 + + +def test_read(): + f = lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='r') + print(f.attrs['test']) + print(f['data'][0, 0]) + f.close() + + +if __name__ == "__main__": + write_lindi_binary() + test_read() diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index bafbcaf..6ff58a2 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -23,6 +23,109 @@ from ..LocalCache.LocalCache import ChunkTooLargeError, LocalCache from ..LindiRemfile.LindiRemfile import LindiRemfile from .LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts +from ..LindiH5pyFile.LindiReferenceFileSystemStore import _get_padded_size, _pad_chunk + + +class SplitDatasetH5Item: + """ + Represents a dataset that is a single contiguous chunk in the hdf5 file, but + is split into multiple chunks for efficient slicing in the zarr store. + """ + def __init__(self, h5_item, *, contiguous_dataset_max_chunk_size: Union[int, None]): + self._h5_item = h5_item + self._contiguous_dataset_max_chunk_size = contiguous_dataset_max_chunk_size + should_split = False + if contiguous_dataset_max_chunk_size is not None: + codecs = h5_filters_to_codecs(h5_item) + if codecs is None or len(codecs) == 0: # does not have compression + if h5_item.chunks is None or h5_item.chunks == h5_item.shape: # only one chunk + if h5_item.dtype.kind in ['i', 'u', 'f']: # integer or float + size_bytes = int(np.prod(h5_item.shape)) * h5_item.dtype.itemsize + if size_bytes > contiguous_dataset_max_chunk_size: # large enough to split + should_split = True + self._do_split = should_split + if should_split: + size0 = int(np.prod(h5_item.shape[1:])) * h5_item.dtype.itemsize + # We want each chunk to be of size around + # contiguous_dataset_max_chunk_size. So if nn is the size of a chunk + # in the first dimension, then nn * size0 should be approximately + # contiguous_dataset_max_chunk_size. So nn should be approximately + # contiguous_dataset_max_chunk_size // size0 + nn = contiguous_dataset_max_chunk_size // size0 + if nn == 0: + # The chunk size should not be zero + nn = 1 + self._split_chunk_shape = (nn,) + h5_item.shape[1:] + if h5_item.chunks is not None: + zero_chunk_coords = (0,) * h5_item.ndim + try: + byte_offset, byte_count = _get_chunk_byte_range(h5_item, zero_chunk_coords) + except Exception as e: + raise Exception( + f"Error getting byte range for chunk when trying to split contiguous dataset {h5_item.name}: {e}" + ) + else: + # Get the byte range in the file for the contiguous dataset + byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) + self._split_chunk_byte_offset = byte_offset + self._split_chunk_byte_count = byte_count + self._num_chunks = int(np.prod(h5_item.shape[0:]) + np.prod(self._split_chunk_shape) - 1) // int(np.prod(self._split_chunk_shape)) + else: + self._split_chunk_shape = None + self._split_chunk_byte_offset = None + self._split_chunk_byte_count = None + self._num_chunks = None + + def get_chunk_byte_range(self, chunk_coords: Tuple[int, ...]): + if len(chunk_coords) != self.ndim: + raise Exception(f"SplitDatasetH5Item: Chunk coordinates {chunk_coords} do not match dataset dimensions") + for i in range(1, len(chunk_coords)): + if chunk_coords[i] != 0: + raise Exception(f"SplitDatasetH5Item: Unexpected non-zero chunk coordinate {chunk_coords[i]}") + if self._split_chunk_byte_offset is None: + raise Exception("SplitDatasetH5Item: Unexpected _split_chunk_byte_offset is None") + if self._split_chunk_shape is None: + raise Exception("SplitDatasetH5Item: Unexpected _split_chunk_shape is None") + chunk_index = chunk_coords[0] + byte_offset = self._split_chunk_byte_offset + chunk_index * int(np.prod(self._split_chunk_shape)) * self.dtype.itemsize + byte_count = int(np.prod(self._split_chunk_shape)) * self.dtype.itemsize + if byte_offset + byte_count > self._split_chunk_byte_offset + self._split_chunk_byte_count: + byte_count = self._split_chunk_byte_offset + self._split_chunk_byte_count - byte_offset + return byte_offset, byte_count + + @property + def shape(self): + return self._h5_item.shape + + @property + def dtype(self): + return self._h5_item.dtype + + @property + def name(self): + return self._h5_item.name + + @property + def chunks(self): + if self._do_split: + return self._split_chunk_shape + return self._h5_item.chunks + + @property + def ndim(self): + return self._h5_item.ndim + + @property + def fillvalue(self): + return self._h5_item.fillvalue + + @property + def attrs(self): + return self._h5_item.attrs + + @property + def size(self): + return self._h5_item.size class LindiH5ZarrStore(Store): @@ -65,6 +168,9 @@ def __init__( # it when the chunk is requested. self._inline_arrays: Dict[str, InlineArray] = {} + # For large contiguous arrays, we want to split them into smaller chunks. + self._split_datasets: Dict[str, SplitDatasetH5Item] = {} + self._external_array_links: Dict[str, Union[dict, None]] = {} @staticmethod @@ -118,6 +224,16 @@ def close(self): self._file = None def __getitem__(self, key): + val = self._get_helper(key) + + if val is not None: + padded_size = _get_padded_size(self, key, val) + if padded_size is not None: + val = _pad_chunk(val, padded_size) + + return val + + def _get_helper(self, key: str): """Get an item from the store (required by base class).""" parts = [part for part in key.split("/") if part] if len(parts) == 0: @@ -180,6 +296,8 @@ def __contains__(self, key): return False if not isinstance(h5_item, h5py.Dataset): return False + if self._split_datasets.get(key_parent, None) is not None: + h5_item = self._split_datasets[key_parent] external_array_link = self._get_external_array_link(key_parent, h5_item) if external_array_link is not None: # The chunk files do not exist for external array links @@ -278,7 +396,7 @@ def _get_zgroup_bytes(self, parent_key: str): zarr.group(store=memory_store) return reformat_json(memory_store.get(".zgroup")) - def _get_inline_array(self, key: str, h5_dataset: h5py.Dataset): + def _get_inline_array(self, key: str, h5_dataset: Union[h5py.Dataset, SplitDatasetH5Item]): if key in self._inline_arrays: return self._inline_arrays[key] self._inline_arrays[key] = InlineArray(h5_dataset) @@ -299,6 +417,11 @@ def _get_zarray_bytes(self, parent_key: str): filters = h5_filters_to_codecs(h5_item) + split_dataset = SplitDatasetH5Item(h5_item, contiguous_dataset_max_chunk_size=self._opts.contiguous_dataset_max_chunk_size) + if split_dataset._do_split: + self._split_datasets[parent_key] = split_dataset + h5_item = split_dataset + # We create a dummy zarr dataset with the appropriate shape, chunks, # dtype, and filters and then copy the .zarray JSON text from it memory_store = MemoryStore() @@ -370,6 +493,9 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): if not isinstance(h5_item, h5py.Dataset): raise Exception(f"Item {key_parent} is not a dataset") + if self._split_datasets.get(key_parent, None) is not None: + h5_item = self._split_datasets[key_parent] + external_array_link = self._get_external_array_link(key_parent, h5_item) if external_array_link is not None: raise Exception( @@ -418,7 +544,10 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): if h5_item.chunks is not None: # Get the byte range in the file for the chunk. try: - byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) + if isinstance(h5_item, SplitDatasetH5Item): + byte_offset, byte_count = h5_item.get_chunk_byte_range(chunk_coords) + else: + byte_offset, byte_count = _get_chunk_byte_range(h5_item, chunk_coords) except Exception as e: raise Exception( f"Error getting byte range for chunk {key_parent}/{key_name}. Shape: {h5_item.shape}, Chunks: {h5_item.chunks}, Chunk coords: {chunk_coords}: {e}" @@ -430,6 +559,8 @@ def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): raise Exception( f"Chunk coordinates {chunk_coords} are not (0, 0, 0, ...) for contiguous dataset {key_parent} with dtype {h5_item.dtype} and shape {h5_item.shape}" ) + if isinstance(h5_item, SplitDatasetH5Item): + raise Exception(f'Unexpected SplitDatasetH5Item for contiguous dataset {key_parent}') # Get the byte range in the file for the contiguous dataset byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) return byte_offset, byte_count, None @@ -440,6 +571,9 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch h5_item = self._h5f.get('/' + key_parent, None) assert isinstance(h5_item, h5py.Dataset) + if self._split_datasets.get(key_parent, None) is not None: + h5_item = self._split_datasets[key_parent] + # If the shape is (0,), (0, 0), (0, 0, 0), etc., then do not add any chunk references if np.prod(h5_item.shape) == 0: return @@ -467,7 +601,7 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch # does not provide a way to hook in a progress bar # We use max number of chunks instead of actual number of chunks because get_num_chunks is slow # for remote datasets. - num_chunks = _get_max_num_chunks(h5_item) # NOTE: unallocated chunks are counted + num_chunks = _get_max_num_chunks(shape=h5_item.shape, chunk_size=h5_item.chunks) # NOTE: unallocated chunks are counted pbar = tqdm( total=num_chunks, desc=f"Writing chunk info for {key_parent}", @@ -477,24 +611,35 @@ def _add_chunk_info_to_refs(self, key_parent: str, add_ref: Callable, add_ref_ch chunk_size = h5_item.chunks - def store_chunk_info(chunk_info): - # Get the byte range in the file for each chunk. - chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset - byte_offset = chunk_info.byte_offset - byte_count = chunk_info.size - key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)]) - add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) - pbar.update() + if isinstance(h5_item, SplitDatasetH5Item): + assert h5_item._num_chunks is not None, "Unexpected: _num_chunks is None" + for i in range(h5_item._num_chunks): + chunk_coords = (i,) + (0,) * (h5_item.ndim - 1) + byte_offset, byte_count = h5_item.get_chunk_byte_range(chunk_coords) + key_name = ".".join([str(x) for x in chunk_coords]) + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + pbar.update() + else: + def store_chunk_info(chunk_info): + # Get the byte range in the file for each chunk. + chunk_offset: Tuple[int, ...] = chunk_info.chunk_offset + byte_offset = chunk_info.byte_offset + byte_count = chunk_info.size + key_name = ".".join([str(a // b) for a, b in zip(chunk_offset, chunk_size)]) + add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) + pbar.update() + + _apply_to_all_chunk_info(h5_item, store_chunk_info) - _apply_to_all_chunk_info(h5_item, store_chunk_info) pbar.close() else: # Get the byte range in the file for the contiguous dataset + assert not isinstance(h5_item, SplitDatasetH5Item), "Unexpected SplitDatasetH5Item for contiguous dataset" byte_offset, byte_count = _get_byte_range_for_contiguous_dataset(h5_item) key_name = ".".join("0" for _ in range(h5_item.ndim)) add_ref_chunk(f"{key_parent}/{key_name}", (self._url, byte_offset, byte_count)) - def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset): + def _get_external_array_link(self, parent_key: str, h5_item: Union[h5py.Dataset, SplitDatasetH5Item]): # First check the memory cache if parent_key in self._external_array_links: return self._external_array_links[parent_key] @@ -510,7 +655,7 @@ def _get_external_array_link(self, parent_key: str, h5_item: h5py.Dataset): (shape[i] + chunks[i] - 1) // chunks[i] if chunks[i] != 0 else 0 for i in range(len(shape)) ] - num_chunks = np.prod(chunk_coords_shape) + num_chunks = int(np.prod(chunk_coords_shape)) if num_chunks > self._opts.num_dataset_chunks_threshold: if self._url is not None: self._external_array_links[parent_key] = { @@ -663,7 +808,7 @@ def _process_dataset(key, item: h5py.Dataset): class InlineArray: - def __init__(self, h5_dataset: h5py.Dataset): + def __init__(self, h5_dataset: Union[h5py.Dataset, SplitDatasetH5Item]): self._additional_zarr_attributes = {} if h5_dataset.shape == (): self._additional_zarr_attributes["_SCALAR"] = True @@ -686,9 +831,15 @@ def __init__(self, h5_dataset: h5py.Dataset): # For example: [['x', 'uint32'], ['y', 'uint32'], ['weight', 'float32']] self._additional_zarr_attributes["_COMPOUND_DTYPE"] = compound_dtype if self._is_inline: + if isinstance(h5_dataset, SplitDatasetH5Item): + raise Exception('SplitDatasetH5Item should not be an inline array') memory_store = MemoryStore() dummy_group = zarr.group(store=memory_store) size_is_zero = np.prod(h5_dataset.shape) == 0 + if isinstance(h5_dataset, SplitDatasetH5Item): + h5_item = h5_dataset._h5_item + else: + h5_item = h5_dataset create_zarr_dataset_from_h5_data( zarr_parent_group=dummy_group, name='X', @@ -700,8 +851,8 @@ def __init__(self, h5_dataset: h5py.Dataset): label=f'{h5_dataset.name}', h5_shape=h5_dataset.shape, h5_dtype=h5_dataset.dtype, - h5f=h5_dataset.file, - h5_data=h5_dataset[...] + h5f=h5_item.file, + h5_data=h5_item[...] ) self._zarray_bytes = reformat_json(memory_store['X/.zarray']) if not size_is_zero: diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStoreOpts.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStoreOpts.py index 40fe998..d8ea82e 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStoreOpts.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStoreOpts.py @@ -13,5 +13,12 @@ class LindiH5ZarrStoreOpts: the dataset will be represented as an external array link. If None, then no datasets will be represented as external array links (equivalent to a threshold of 0). Default is 1000. + + contiguous_dataset_max_chunk_size (Union[int, None]): For large + contiguous arrays in the hdf5 file that are not chunked, this option + specifies the maximum size in bytes of the zarr chunks that will be + created. If None, then the entire array will be represented as a single + chunk. Default is 1000 * 1000 * 20 """ num_dataset_chunks_threshold: Union[int, None] = 1000 + contiguous_dataset_max_chunk_size: Union[int, None] = 1000 * 1000 * 20 diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 0badbae..681164a 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -12,15 +12,16 @@ def _read_bytes(file: IO, offset: int, count: int): return file.read(count) -def _get_max_num_chunks(h5_dataset: h5py.Dataset): +def _get_max_num_chunks(*, shape, chunk_size): """Get the maximum number of chunks in an h5py dataset. This is similar to h5_dataset.id.get_num_chunks() but significantly faster. It does not account for whether some chunks are allocated. """ - chunk_size = h5_dataset.chunks assert chunk_size is not None - return math.prod([math.ceil(a / b) for a, b in zip(h5_dataset.shape, chunk_size)]) + if np.prod(chunk_size) == 0: + return 0 + return math.prod([math.ceil(a / b) for a, b in zip(shape, chunk_size)]) def _apply_to_all_chunk_info(h5_dataset: h5py.Dataset, callback: Callable): diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 5968a34..6ad66fc 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -2,7 +2,6 @@ import os import json import tempfile -import urllib.request import h5py import zarr from zarr.storage import Store as ZarrStore @@ -15,11 +14,14 @@ from ..LindiStagingStore.StagingArea import StagingArea from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts +from ..lindi1 import Lindi1Store from ..LocalCache.LocalCache import LocalCache from ..LindiH5ZarrStore._util import _write_rfs_to_file +from ..lindi1._lindi1 import _load_rfs_dict_from_local_lindi_file, _load_rfs_dict_from_remote_lindi_file, _write_rfs_dict_to_lindi1_file, _create_empty_lindi_file + LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"] @@ -29,7 +31,7 @@ class LindiH5pyFile(h5py.File): - def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _local_file_path: Union[str, None] = None): + def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None): """ Do not use this constructor directly. Instead, use: from_lindi_file, from_h5py_file, from_reference_file_system, from_zarr_store, or @@ -40,22 +42,26 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non self._mode: LindiFileMode = _mode self._the_group = LindiH5pyGroup(_zarr_group, self) self._local_cache = _local_cache - self._local_file_path = _local_file_path + self._source_url_or_path = _source_url_or_path + self._source_is_lindi1_format = _source_is_lindi1_format # see comment in LindiH5pyGroup self._id = f'{id(self._zarr_group)}/' @staticmethod - def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, create_binary: bool = False): """ Create a LindiH5pyFile from a URL or path to a .lindi.json file. For a description of parameters, see from_reference_file_system(). """ - if local_file_path is None: - if not url_or_path.startswith("http://") and not url_or_path.startswith("https://"): - local_file_path = url_or_path - return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + return LindiH5pyFile.from_reference_file_system( + url_or_path, + mode=mode, + staging_area=staging_area, + local_cache=local_cache, + create_binary=create_binary + ) @staticmethod def from_hdf5_file( @@ -95,11 +101,13 @@ def from_hdf5_file( return LindiH5pyFile.from_zarr_store( zarr_store=zarr_store, mode=mode, - local_cache=local_cache + local_cache=local_cache, + _source_url_or_path=None, + _source_is_lindi1_format=None ) @staticmethod - def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None, create_binary: bool = False): """ Create a LindiH5pyFile from a reference file system. @@ -116,11 +124,11 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo is only used in write mode, by default None. local_cache : Union[LocalCache, None], optional The local cache to use for caching data, by default None. - local_file_path : Union[str, None], optional - If rfs is not a string or is a remote url, this is the path to the - local file for the purpose of writing to it. It is required in this - case if mode is not "r". If rfs is a string and not a remote url, it - must be equal to local_file_path if provided. + _source_url_or_path : Union[str, None], optional + _source_is_lindi1_format : Union[bool, None], optional create_binary : + bool, optional + Only applies when writing a new file. If True, a binary lindi file + will be created. """ if rfs is None: rfs = { @@ -131,26 +139,32 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo }, } + if create_binary: + if mode not in ["w", "w-", "x"]: + raise Exception("create_binary is only supported in write mode") + if isinstance(rfs, str): rfs_is_url = rfs.startswith("http://") or rfs.startswith("https://") - if local_file_path is not None and not rfs_is_url and rfs != local_file_path: - raise Exception(f"rfs is not a remote url, so local_file_path must be the same as rfs, but got: {rfs} and {local_file_path}") if rfs_is_url: - with tempfile.TemporaryDirectory() as tmpdir: - filename = f"{tmpdir}/temp.lindi.json" - _download_file(rfs, filename) - with open(filename, "r") as f: - data = json.load(f) - assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + if create_binary: + raise Exception("create_binary is not supported with remote files") + data, is_lindi1_format = _load_rfs_dict_from_remote_lindi_file(rfs) + assert isinstance(data, dict) # prevent infinite recursion + if _source_url_or_path is not None: + if _source_url_or_path != rfs: + raise Exception(f"source_url_or_path must match rfs if rfs is a remote file, got: {rfs}, {_source_url_or_path}") + if _source_is_lindi1_format is not None: + if _source_is_lindi1_format != is_lindi1_format: + raise Exception(f"source_is_lindi1_format must match is_lindi1_format if rfs is a remote file, got: {is_lindi1_format}, {_source_is_lindi1_format}") + return LindiH5pyFile.from_reference_file_system( + data, + mode=mode, + staging_area=staging_area, + local_cache=local_cache, + _source_url_or_path=rfs, + _source_is_lindi1_format=is_lindi1_format + ) else: - empty_rfs = { - "refs": { - '.zgroup': { - 'zarr_format': 2 - } - }, - } if mode == "r": # Readonly, file must exist (default) if not os.path.exists(rfs): @@ -161,36 +175,59 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo raise Exception(f"File does not exist: {rfs}") elif mode == "w": # Create file, truncate if exists - with open(rfs, "w") as f: - json.dump(empty_rfs, f) + _create_empty_lindi_file(rfs, create_binary=create_binary) elif mode in ["w-", "x"]: # Create file, fail if exists if os.path.exists(rfs): raise Exception(f"File already exists: {rfs}") - with open(rfs, "w") as f: - json.dump(empty_rfs, f) + _create_empty_lindi_file(rfs, create_binary=create_binary) elif mode == "a": # Read/write if exists, create otherwise if os.path.exists(rfs): with open(rfs, "r") as f: data = json.load(f) + else: + _create_empty_lindi_file(rfs, create_binary=create_binary) else: raise Exception(f"Unhandled mode: {mode}") - with open(rfs, "r") as f: - data = json.load(f) + data, is_lindi1_format = _load_rfs_dict_from_local_lindi_file(rfs) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + if _source_url_or_path is not None: + if _source_url_or_path != rfs: + raise Exception(f"source_url_or_path must match rfs if rfs is a local file, got: {rfs}, {_source_url_or_path}") + if _source_is_lindi1_format is not None: + if _source_is_lindi1_format != is_lindi1_format: + raise Exception(f"source_is_lindi1_format must match is_lindi1_format if rfs is a local file, got: {is_lindi1_format}, {_source_is_lindi1_format}") + return LindiH5pyFile.from_reference_file_system( + data, mode=mode, + staging_area=staging_area, + local_cache=local_cache, + _source_url_or_path=rfs, + _source_is_lindi1_format=is_lindi1_format + ) elif isinstance(rfs, dict): # This store does not need to be closed - store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache) + store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache, _source_url_or_path=_source_url_or_path) if staging_area: + if _source_is_lindi1_format: + raise Exception("Staging area is not supported with lindi1 format") store = LindiStagingStore(base_store=store, staging_area=staging_area) - return LindiH5pyFile.from_zarr_store(store, mode=mode, local_file_path=local_file_path, local_cache=local_cache) + elif _source_is_lindi1_format and _source_url_or_path is not None: + is_url = _source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://") + if not is_url: + store = Lindi1Store(base_store=store, lindi1_file_name=_source_url_or_path) + return LindiH5pyFile.from_zarr_store( + store, + mode=mode, + local_cache=local_cache, + _source_url_or_path=_source_url_or_path, + _source_is_lindi1_format=_source_is_lindi1_format + ) else: raise Exception(f"Unhandled type for rfs: {type(rfs)}") @staticmethod - def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None): """ Create a LindiH5pyFile from a zarr store. @@ -207,10 +244,17 @@ def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cach # does not need to be closed zarr_group = zarr.open(store=zarr_store, mode=mode) assert isinstance(zarr_group, zarr.Group) - return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, local_file_path=local_file_path) + return LindiH5pyFile.from_zarr_group( + zarr_group, + _zarr_store=zarr_store, + mode=mode, + local_cache=local_cache, + _source_url_or_path=_source_url_or_path, + _source_is_lindi1_format=_source_is_lindi1_format + ) @staticmethod - def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None): """ Create a LindiH5pyFile from a zarr group. @@ -228,7 +272,7 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_ See from_zarr_store(). """ - return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _local_file_path=local_file_path) + return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_is_lindi1_format=_source_is_lindi1_format) def to_reference_file_system(self): """ @@ -241,6 +285,8 @@ def to_reference_file_system(self): if isinstance(zarr_store, LindiStagingStore): zarr_store.consolidate_chunks() zarr_store = zarr_store._base_store + if isinstance(zarr_store, Lindi1Store): + zarr_store = zarr_store._base_store if isinstance(zarr_store, LindiH5ZarrStore): return zarr_store.to_reference_file_system() if not isinstance(zarr_store, LindiReferenceFileSystemStore): @@ -284,8 +330,8 @@ def upload( if isinstance(v, list) and len(v) == 3: url = _apply_templates(v[0], rfs.get('templates', {})) if not url.startswith("http://") and not url.startswith("https://"): - local_path = url - blobs_to_upload.add(local_path) + local_path_for_blob = url + blobs_to_upload.add(local_path_for_blob) # Upload each of the local blobs using the given upload function and get a mapping from # the original file paths to the URLs of the uploaded files blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob) @@ -358,9 +404,14 @@ def close(self): self.flush() def flush(self): - if self._mode != 'r' and self._local_file_path is not None: - rfs = self.to_reference_file_system() - _write_rfs_to_file(rfs=rfs, output_file_name=self._local_file_path) + if self._mode != 'r' and self._source_url_or_path is not None: + is_url = self._source_url_or_path.startswith("http://") or self._source_url_or_path.startswith("https://") + if not is_url: + rfs = self.to_reference_file_system() + if self._source_is_lindi1_format: + _write_rfs_dict_to_lindi1_file(rfs=rfs, output_file_name=self._source_url_or_path) + else: + _write_rfs_to_file(rfs=rfs, output_file_name=self._source_url_or_path) def __enter__(self): # type: ignore return self @@ -446,6 +497,12 @@ def get(self, name, default=None, getclass=False, getlink=False): raise Exception("Getting class is not allowed") return self._get_item(name, getlink=getlink, default=default) + def keys(self): # type: ignore + return self._the_group.keys() + + def items(self): + return self._the_group.items() + def __iter__(self): return self._the_group.__iter__() @@ -506,16 +563,6 @@ def staging_store(self): return store -def _download_file(url: str, filename: str) -> None: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" - } - req = urllib.request.Request(url, headers=headers) - with urllib.request.urlopen(req) as response: - with open(filename, "wb") as f: - f.write(response.read()) - - def _recursive_copy(src_item: Union[h5py.Group, h5py.Dataset], dest: h5py.File, name: str) -> None: if isinstance(src_item, h5py.Group): dst_item = dest.create_group(name) diff --git a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py index 7b51fcc..018597a 100644 --- a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py +++ b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py @@ -1,6 +1,7 @@ from typing import Literal, Dict, Union import json import base64 +import numpy as np import requests from zarr.storage import Store as ZarrStore @@ -68,7 +69,7 @@ class LindiReferenceFileSystemStore(ZarrStore): It is okay for rfs to be modified outside of this class, and the changes will be reflected immediately in the store. """ - def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None): + def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None): """ Create a LindiReferenceFileSystemStore. @@ -113,6 +114,7 @@ def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: U self.rfs = rfs self.mode = mode self.local_cache = local_cache + self._source_url_or_path = _source_url_or_path # These methods are overridden from MutableMapping def __contains__(self, key: object): @@ -121,6 +123,16 @@ def __contains__(self, key: object): return key in self.rfs["refs"] def __getitem__(self, key: str): + val = self._get_helper(key) + + if val is not None: + padded_size = _get_padded_size(self, key, val) + if padded_size is not None: + val = _pad_chunk(val, padded_size) + + return val + + def _get_helper(self, key: str): if key not in self.rfs["refs"]: raise KeyError(key) x = self.rfs["refs"][key] @@ -144,6 +156,9 @@ def __getitem__(self, key: str): x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length) if x is not None: return x + if url == '.' and self._source_url_or_path: + # this is where the file refers to bytes in the same file (lindi binary format) + url = self._source_url_or_path val = _read_bytes_from_url_or_path(url, offset, length) if self.local_cache is not None: try: @@ -263,3 +278,48 @@ def _read_bytes_from_url_or_path(url_or_path: str, offset: int, length: int): with open(url_or_path, 'rb') as f: f.seek(offset) return f.read(length) + + +def _is_chunk_base_key(base_key: str) -> bool: + a = base_key.split('.') + if len(a) == 0: + return False + for x in a: + # check if integer + try: + int(x) + except ValueError: + return False + return True + + +def _get_itemsize(dtype: str) -> int: + d = np.dtype(dtype) + return d.itemsize + + +def _pad_chunk(data: bytes, expected_chunk_size: int) -> bytes: + return data + b'\0' * (expected_chunk_size - len(data)) + + +def _get_padded_size(store, key: str, val: bytes): + # If the key is a chunk and it's smaller than the expected size, then we + # need to pad it with zeros. This can happen if this is the final chunk + # in a contiguous hdf5 dataset. See + # https://github.com/NeurodataWithoutBorders/lindi/pull/84 + base_key = key.split('/')[-1] + if val and _is_chunk_base_key(base_key): + parent_key = key.split('/')[:-1] + zarray_key = '/'.join(parent_key) + '/.zarray' + if zarray_key in store: + zarray_json = store.__getitem__(zarray_key) + assert isinstance(zarray_json, bytes) + zarray = json.loads(zarray_json) + chunk_shape = zarray['chunks'] + dtype = zarray['dtype'] + if np.dtype(dtype).kind in ['i', 'u', 'f']: + expected_chunk_size = int(np.prod(chunk_shape)) * _get_itemsize(dtype) + if len(val) < expected_chunk_size: + return expected_chunk_size + + return None diff --git a/lindi/conversion/create_zarr_dataset_from_h5_data.py b/lindi/conversion/create_zarr_dataset_from_h5_data.py index fa2fe45..669c517 100644 --- a/lindi/conversion/create_zarr_dataset_from_h5_data.py +++ b/lindi/conversion/create_zarr_dataset_from_h5_data.py @@ -117,7 +117,7 @@ def create_zarr_dataset_from_h5_data( # than 1 million elements. This is because zarr may default to # suboptimal chunking. Note that the default for h5py is to use the # entire dataset as a single chunk. - total_size = np.prod(h5_shape) if len(h5_shape) > 0 else 1 + total_size = int(np.prod(h5_shape)) if len(h5_shape) > 0 else 1 if total_size > 1000 * 1000: raise Exception(f'Chunks must be specified explicitly when writing dataset of shape {h5_shape}') # Note that we are not using the same filters as in the h5py dataset diff --git a/lindi/lindi1/Lindi1Store.py b/lindi/lindi1/Lindi1Store.py new file mode 100644 index 0000000..34b083d --- /dev/null +++ b/lindi/lindi1/Lindi1Store.py @@ -0,0 +1,82 @@ +import os +from zarr.storage import Store as ZarrStore +from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore + + +class Lindi1Store(ZarrStore): + """ + A Zarr store that allows supplementing a base LindiReferenceFileSystemStore + where the large data blobs are appended to a lindi file of lindi1 format. + """ + def __init__(self, *, base_store: LindiReferenceFileSystemStore, lindi1_file_name: str): + """ + Create a LindiStagingStore. + + Parameters + ---------- + base_store : LindiReferenceFileSystemStore + The base store that this store supplements. + lindi1_file_name : str + The name of the lindi1 file that will be created or appended to. + """ + self._base_store = base_store + self._lindi1_file_name = lindi1_file_name + + def __getitem__(self, key: str): + return self._base_store.__getitem__(key) + + def __setitem__(self, key: str, value: bytes): + key_parts = key.split("/") + key_base_name = key_parts[-1] + if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json + inline = True + else: + # presumably it is a chunk of an array + if not isinstance(value, bytes): + raise ValueError("Value must be bytes") + size = len(value) + inline = size < 1000 # this should be a configurable threshold + if inline: + # If inline, save in memory + return self._base_store.__setitem__(key, value) + else: + # If not inline, append it to the lindi1 file + key_without_initial_slash = key if not key.startswith("/") else key[1:] + lindi1_file_size = os.path.getsize(self._lindi1_file_name) + with open(self._lindi1_file_name, "ab") as f: + f.write(value) + self._set_ref_reference(key_without_initial_slash, '.', lindi1_file_size, len(value)) + + def __delitem__(self, key: str): + # We can't delete the file from the lindi1 file, but we do need to remove the reference + return self._base_store.__delitem__(key) + + def __iter__(self): + return self._base_store.__iter__() + + def __len__(self): + return self._base_store.__len__() + + # These methods are overridden from BaseStore + def is_readable(self): + return True + + def is_writeable(self): + return True + + def is_listable(self): + return True + + def is_erasable(self): + return False + + def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): + rfs = self._base_store.rfs + if 'refs' not in rfs: + # this shouldn't happen, but we'll be defensive + rfs['refs'] = {} + rfs['refs'][key] = [ + filename, + offset, + size + ] diff --git a/lindi/lindi1/__init__.py b/lindi/lindi1/__init__.py new file mode 100644 index 0000000..8dd16f6 --- /dev/null +++ b/lindi/lindi1/__init__.py @@ -0,0 +1 @@ +from .Lindi1Store import Lindi1Store # noqa: F401 diff --git a/lindi/lindi1/_lindi1.py b/lindi/lindi1/_lindi1.py new file mode 100644 index 0000000..5db139f --- /dev/null +++ b/lindi/lindi1/_lindi1.py @@ -0,0 +1,177 @@ +import os +import json +import tempfile +import urllib.request + + +def _load_rfs_dict_from_local_lindi_file(filename: str): + header_buf = _read_bytes_from_file(filename, 0, 1024) + lindi1_header = _parse_lindi1_header(header_buf) + if lindi1_header is not None: + # This is lindi1 format + assert lindi1_header['format'] == 'lindi1' + rfs_start = lindi1_header['rfs_start'] + rfs_size = lindi1_header['rfs_size'] + rfs_buf = _read_bytes_from_file(filename, rfs_start, rfs_start + rfs_size) + return json.loads(rfs_buf), True + else: + # In this case, it must be a regular json file + with open(filename, "r") as f: + return json.load(f), False + + +def _load_rfs_dict_from_remote_lindi_file(url: str): + file_size = _get_file_size_of_remote_file(url) + if file_size < 1024 * 1024 * 2: + # if it's a small file, we'll just download the whole thing + with tempfile.TemporaryDirectory() as tmpdir: + tmp_fname = f"{tmpdir}/temp.lindi.json" + _download_file(url, tmp_fname) + return _load_rfs_dict_from_local_lindi_file(tmp_fname) + else: + # if it's a large file, we start by downloading the first 1024 bytes + header_buf = _download_file_byte_range(url, 0, 1024) + lindi1_header = _parse_lindi1_header(header_buf) + if lindi1_header is not None: + # This is lindi1 format + assert lindi1_header['format'] == 'lindi1' + rfs_start = lindi1_header['rfs_start'] + rfs_size = lindi1_header['rfs_size'] + rfs_buf = _download_file_byte_range(url, rfs_start, rfs_start + rfs_size) + return json.loads(rfs_buf), True + else: + # In this case, it must be a regular json file + with tempfile.TemporaryDirectory() as tmpdir: + tmp_fname = f"{tmpdir}/temp.lindi.json" + _download_file(url, tmp_fname) + with open(tmp_fname, "r") as f: + return json.load(f), False + + +def _write_rfs_dict_to_lindi1_file(*, rfs: dict, output_file_name: str): + rfs_buf = json.dumps(rfs).encode("utf-8") + header_buf = _read_bytes_from_file(output_file_name, 0, 1024) + lindi1_header = _parse_lindi1_header(header_buf) + assert lindi1_header is not None + assert lindi1_header['format'] == 'lindi1' + old_rfs_start = lindi1_header['rfs_start'] + old_rfs_size = lindi1_header['rfs_size'] + old_rfs_padding = lindi1_header['rfs_padding'] + if len(rfs_buf) < old_rfs_size + old_rfs_padding - 1: + # we don't need to allocate a new space in the file for the rfs + rfs_buf_padded = rfs_buf + b"\0" * (old_rfs_size + old_rfs_padding - len(rfs_buf)) + _write_bytes_within_file(output_file_name, old_rfs_start, rfs_buf_padded) + new_rfs_start = old_rfs_start + new_rfs_size = len(rfs_buf) + new_rfs_padding = old_rfs_size + old_rfs_padding - len(rfs_buf) + else: + # we need to allocate a new space. First zero out the old space + zeros = b"\0" * (old_rfs_size + old_rfs_padding) + _write_bytes_within_file(output_file_name, old_rfs_start, zeros) + file_size = os.path.getsize(output_file_name) + new_rfs_start = file_size + # determine size of new space, to be double the needed size + new_rfs_size = len(rfs_buf) * 2 + new_rfs_padding = new_rfs_size - len(rfs_buf) + new_rfs_buf_padded = rfs_buf + b"\0" * new_rfs_padding + # write the new rfs + _append_bytes_to_file(output_file_name, new_rfs_buf_padded) + new_lindi1_header = { + **lindi1_header, + "rfs_start": new_rfs_start, + "rfs_size": new_rfs_size, + "rfs_padding": new_rfs_padding + } + new_lindi1_header_buf = json.dumps(new_lindi1_header).encode("utf-8") + if len(new_lindi1_header_buf) > 1024: + raise Exception("New header is too long") + new_lindi1_header_buf_padded = new_lindi1_header_buf + b"\0" * (1024 - len(new_lindi1_header_buf)) + _write_bytes_within_file(output_file_name, 0, new_lindi1_header_buf_padded) + + +def _download_file(url: str, filename: str) -> None: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + with open(filename, "wb") as f: + f.write(response.read()) + + +def _download_file_byte_range(url: str, start: int, end: int) -> bytes: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + "Range": f"bytes={start}-{end - 1}" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + return response.read() + + +def _read_bytes_from_file(filename: str, start: int, end: int) -> bytes: + with open(filename, "rb") as f: + f.seek(start) + return f.read(end - start) + + +def _write_bytes_within_file(filename: str, start: int, buf: bytes) -> None: + with open(filename, "r+b") as f: + f.seek(start) + f.write(buf) + + +def _append_bytes_to_file(filename: str, buf: bytes) -> None: + with open(filename, "ab") as f: + f.write(buf) + + +def _parse_lindi1_header(buf: bytes): + first_zero_index = buf.find(0) + if first_zero_index == -1: + return None + header_json = buf[:first_zero_index].decode("utf-8") + header: dict = json.loads(header_json) + if header.get('format') != 'lindi1': + raise Exception(f"Not lindi1 format: {header.get('format')}") + return header + + +def _get_file_size_of_remote_file(url: str) -> int: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + return int(response.headers['Content-Length']) + + +def _create_empty_lindi_file(filename: str, create_binary: bool): + empty_rfs = { + "refs": { + '.zgroup': { + 'zarr_format': 2 + } + }, + } + if create_binary: + rfs_buf = json.dumps(empty_rfs).encode("utf-8") + # start with reasonable padding + total_rfs_allocated_space = 1024 * 1024 + while len(rfs_buf) * 2 > total_rfs_allocated_space: + total_rfs_allocated_space *= 2 + lindi1_header = { + "format": "lindi1", + "rfs_start": 1024, + "rfs_size": len(rfs_buf), + "rfs_padding": total_rfs_allocated_space - len(rfs_buf) + } + lindi1_header_buf = json.dumps(lindi1_header).encode("utf-8") + lindi1_header_buf_padded = lindi1_header_buf + b"\0" * (1024 - len(lindi1_header_buf)) + with open(filename, "wb") as f: + f.write(lindi1_header_buf_padded) + f.write(rfs_buf) + f.write(b"\0" * lindi1_header['rfs_padding']) + else: + with open(filename, "w") as f: + json.dump(empty_rfs, f, indent=2) diff --git a/pyproject.toml b/pyproject.toml index 43c0fad..3d25abf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "lindi" -version = "0.3.10" +version = "0.3.14" description = "" authors = [ "Jeremy Magland ", diff --git a/tests/test_local_cache.py b/tests/test_local_cache.py index 7fdb857..4d180ef 100644 --- a/tests/test_local_cache.py +++ b/tests/test_local_cache.py @@ -42,7 +42,7 @@ def test_remote_data_1(): elapsed_0 = elapsed if passnum == 1: elapsed_1 = elapsed - assert elapsed_1 < elapsed_0 * 0.3 # type: ignore + assert elapsed_1 < elapsed_0 * 0.6 # type: ignore def test_put_local_cache(): diff --git a/tests/test_split_contiguous_dataset.py b/tests/test_split_contiguous_dataset.py new file mode 100644 index 0000000..569fec3 --- /dev/null +++ b/tests/test_split_contiguous_dataset.py @@ -0,0 +1,29 @@ +import pytest +import lindi +import h5py + + +@pytest.mark.network +def test_split_contiguous_dataset(): + # https://neurosift.app/?p=/nwb&dandisetId=000935&dandisetVersion=draft&url=https://api.dandiarchive.org/api/assets/e18e787a-544a-438e-8396-f396efb3bd3d/download/ + h5_url = "https://api.dandiarchive.org/api/assets/e18e787a-544a-438e-8396-f396efb3bd3d/download/" + + opts = lindi.LindiH5ZarrStoreOpts( + contiguous_dataset_max_chunk_size=1000 * 1000 * 17 + ) + x = lindi.LindiH5pyFile.from_hdf5_file(h5_url, zarr_store_opts=opts) + d = x['acquisition/ElectricalSeries/data'] + assert isinstance(d, h5py.Dataset) + print(d.shape) + assert d[0][0] == 6.736724784228119e-06 + assert d[10 * 1000 * 1000][0] == -1.0145925267155008e-06 + rfs = x.to_reference_file_system() + zarray = rfs['refs']['acquisition/ElectricalSeries/data/.zarray'] + assert zarray['chunks'] == [66406, 32] + aa = rfs['refs']['acquisition/ElectricalSeries/data/5.0'] + assert aa[1] == 2415072880 + assert aa[2] == 16999936 + + +if __name__ == "__main__": + test_split_contiguous_dataset()