diff --git a/README.md b/README.md index fd3b295..054d8f7 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ LINDI provides: - An h5py-like interface for reading from and writing to these data sources that can be used with [pynwb](https://pynwb.readthedocs.io/en/stable/). - A mechanism for uploading and downloading these data sources to and from cloud storage, including DANDI. -This project was inspired by [kerchunk](https://github.com/fsspec/kerchunk) and [hdmf-zarr](https://hdmf-zarr.readthedocs.io/en/latest/index.html) and depends on [zarr](https://zarr.readthedocs.io/en/stable/), [h5py](https://www.h5py.org/), [remfile](https://github.com/magland/remfile) and [numcodecs](https://numcodecs.readthedocs.io/en/stable/). +This project was inspired by [kerchunk](https://github.com/fsspec/kerchunk) and [hdmf-zarr](https://hdmf-zarr.readthedocs.io/en/latest/index.html) and depends on [zarr](https://zarr.readthedocs.io/en/stable/), [h5py](https://www.h5py.org/) and [numcodecs](https://numcodecs.readthedocs.io/en/stable/). ## Installation @@ -35,39 +35,56 @@ pip install -e . ## Use cases +* Lazy-load a remote NWB/HDF5 file for efficient access to metadata and data. * Represent a remote NWB/HDF5 file as a .nwb.lindi.json file. * Read a local or remote .nwb.lindi.json file using pynwb or other tools. * Edit a .nwb.lindi.json file using pynwb or other tools. * Add datasets to a .nwb.lindi.json file using a local staging area. -* Upload a .nwb.lindi.json file to a cloud storage service such as DANDI. +* Upload a .nwb.lindi.json file with staged datasets to a cloud storage service such as DANDI. + +### Lazy-load a remote NWB/HDF5 file for efficient access to metadata and data + +```python +import pynwb +import lindi + +# URL of the remote NWB file +h5_url = "https://api.dandiarchive.org/api/assets/11f512ba-5bcf-4230-a8cb-dc8d36db38cb/download/" + +# Set up a local cache +local_cache = lindi.LocalCache(cache_dir='lindi_cache') + +# Create the h5py-like client +client = lindi.LindiH5pyFile.from_hdf5_file(h5_url, local_cache=local_cache) + +# Open using pynwb +with pynwb.NWBHDF5IO(file=client, mode="r") as io: + nwbfile = io.read() + print(nwbfile) + +# The downloaded data will be cached locally, so subsequent reads will be faster +``` ### Represent a remote NWB/HDF5 file as a .nwb.lindi.json file ```python import json -import pynwb import lindi # URL of the remote NWB file h5_url = "https://api.dandiarchive.org/api/assets/11f512ba-5bcf-4230-a8cb-dc8d36db38cb/download/" -# Create a read-only Zarr store as a wrapper for the h5 file -store = lindi.LindiH5ZarrStore.from_file(h5_url) +# Create the h5py-like client +client = lindi.LindiH5pyFile.from_hdf5_file(h5_url) # Generate a reference file system -rfs = store.to_reference_file_system() +rfs = client.to_reference_file_system() # Save it to a file for later use with open("example.lindi.json", "w") as f: json.dump(rfs, f, indent=2) -# Create an h5py-like client from the reference file system -client = lindi.LindiH5pyFile.from_reference_file_system(rfs) - -# Open using pynwb -with pynwb.NWBHDF5IO(file=client, mode="r") as io: - nwbfile = io.read() - print(nwbfile) +# See the next example for how to read this file ``` ### Read a local or remote .nwb.lindi.json file using pynwb or other tools @@ -79,8 +96,8 @@ import lindi # URL of the remote .nwb.lindi.json file url = 'https://kerchunk.neurosift.org/dandi/dandisets/000939/assets/11f512ba-5bcf-4230-a8cb-dc8d36db38cb/zarr.json' -# Load the h5py-like client for the reference file system -client = lindi.LindiH5pyFile.from_reference_file_system(url) +# Load the h5py-like client +client = lindi.LindiH5pyFile.from_lindi_file(url) # Open using pynwb with pynwb.NWBHDF5IO(file=client, mode="r") as io: @@ -121,7 +138,7 @@ url = 'https://lindi.neurosift.org/dandi/dandisets/000939/assets/11f512ba-5bcf-4 # Load the h5py-like client for the reference file system # in read-write mode with a staging area with lindi.StagingArea.create(base_dir='lindi_staging') as staging_area: - client = lindi.LindiH5pyFile.from_reference_file_system( + client = lindi.LindiH5pyFile.from_lindi_file( url, mode="r+", staging_area=staging_area @@ -130,7 +147,7 @@ with lindi.StagingArea.create(base_dir='lindi_staging') as staging_area: # upload the changes to the remote .nwb.lindi.json file ``` -### Upload a .nwb.lindi.json file to a cloud storage service such as DANDI +### Upload a .nwb.lindi.json file with staged datasets to a cloud storage service such as DANDI See [this example](https://github.com/magland/lindi-dandi/blob/main/devel/lindi_test_2.py). diff --git a/docs/special_zarr_annotations.md b/docs/special_zarr_annotations.md index e9487b5..b00184f 100644 --- a/docs/special_zarr_annotations.md +++ b/docs/special_zarr_annotations.md @@ -34,7 +34,7 @@ Note that we do not currently support external links. - `object_id`: The object_id attribute of the target object (for validation). - `source_object_id`: The object_id attribute of the source object (for validation). -The largely follows the [convention used by hdmf-zarr](https://hdmf-zarr.readthedocs.io/en/latest/storage.html#storing-object-references-in-attributes). +This largely follows the [convention used by hdmf-zarr](https://hdmf-zarr.readthedocs.io/en/latest/storage.html#storing-object-references-in-attributes). HDF5 references can appear within both attributes and datasets. For attributes, the value of the attribute is a dict in the above form. For datasets, the value of an item within the dataset is a dict in the above form. @@ -50,4 +50,4 @@ Zarr arrays can represent compound data types from HDF5 datasets. The `_COMPOUND ### `_EXTERNAL_ARRAY_LINK = {'link_type': 'hdf5_dataset', 'url': '...', 'name': '...'}` -For datasets with an extensive number of chunks such that inclusion in the Zarr or reference file system is impractical, LINDI uses the `_EXTERNAL_ARRAY_LINK` attribute on a Zarr array. This attribute points to an external HDF5 file, specifying the `url` for remote access (or local path) and the `name` of the target dataset within that file. When slicing that dataset, the `LindiH5pyClient` will handle data retrieval, leveraging `h5py` and `remfile` for remote access. \ No newline at end of file +For datasets with an extensive number of chunks such that inclusion in the Zarr or reference file system is impractical, LINDI uses the `_EXTERNAL_ARRAY_LINK` attribute on a Zarr array. This attribute points to an external HDF5 file, specifying the `url` for remote access (or local path) and the `name` of the target dataset within that file. When slicing that dataset, the `LindiH5pyClient` will handle data retrieval, leveraging `h5py` and `LindiRemfile` for remote access. \ No newline at end of file diff --git a/examples/example2.py b/examples/example2.py index 069cbd2..1c9f885 100644 --- a/examples/example2.py +++ b/examples/example2.py @@ -5,7 +5,7 @@ url = 'https://kerchunk.neurosift.org/dandi/dandisets/000939/assets/11f512ba-5bcf-4230-a8cb-dc8d36db38cb/zarr.json' # Load the h5py-like client from the reference file system -client = lindi.LindiH5pyFile.from_reference_file_system(url) +client = lindi.LindiH5pyFile.from_lindi_file(url) # Open using pynwb with pynwb.NWBHDF5IO(file=client, mode="r") as io: diff --git a/examples/example_edit_nwb.py b/examples/example_edit_nwb.py index e1c6e7b..9446867 100644 --- a/examples/example_edit_nwb.py +++ b/examples/example_edit_nwb.py @@ -7,7 +7,7 @@ url = 'https://kerchunk.neurosift.org/dandi/dandisets/000939/assets/11f512ba-5bcf-4230-a8cb-dc8d36db38cb/zarr.json' # Load the h5py-like client from the reference file system -client = lindi.LindiH5pyFile.from_reference_file_system(url, mode='r+') +client = lindi.LindiH5pyFile.from_lindi_file(url, mode='r+') # modify the age of the subject subject = client['general']['subject'] # type: ignore diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 4602d8d..cb83e52 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -4,7 +4,6 @@ from dataclasses import dataclass import numpy as np import zarr -import remfile from zarr.storage import Store, MemoryStore import h5py from ._util import ( @@ -20,6 +19,8 @@ from ..conversion.h5_filters_to_codecs import h5_filters_to_codecs from ..conversion.create_zarr_dataset_from_h5_data import create_zarr_dataset_from_h5_data from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore +from ..LocalCache.LocalCache import LocalCache +from ..LindiRemfile.LindiRemfile import LindiRemfile @dataclass @@ -57,7 +58,8 @@ def __init__( _file: Union[IO, Any], _opts: LindiH5ZarrStoreOpts, _url: Union[str, None] = None, - _entities_to_close: List[Any] + _entities_to_close: List[Any], + _local_cache: Union[LocalCache, None] = None ): """ Do not call the constructor directly. Instead, use the from_file class @@ -67,6 +69,7 @@ def __init__( self._h5f: Union[h5py.File, None] = h5py.File(_file, "r") self._url = _url self._opts = _opts + self._local_cache = _local_cache self._entities_to_close = _entities_to_close + [self._h5f] # Some datasets do not correspond to traditional chunked datasets. For @@ -82,6 +85,7 @@ def from_file( *, opts: LindiH5ZarrStoreOpts = LindiH5ZarrStoreOpts(), url: Union[str, None] = None, + local_cache: Union[LocalCache, None] = None ): """ Create a LindiH5ZarrStore from a file or url pointing to an HDF5 file. @@ -99,14 +103,19 @@ def from_file( local file name, then you will need to set opts.num_dataset_chunks_threshold to None, and you will not be able to use the to_reference_file_system method. + local_cache : LocalCache or None + A local cache to use when reading chunks from a remote file. If None, + then no local cache is used. """ if hdf5_file_name_or_url.startswith( "http://" ) or hdf5_file_name_or_url.startswith("https://"): # note that the remfile.File object does not need to be closed - remf = remfile.File(hdf5_file_name_or_url, verbose=False) - return LindiH5ZarrStore(_file=remf, _url=hdf5_file_name_or_url, _opts=opts, _entities_to_close=[]) + remf = LindiRemfile(hdf5_file_name_or_url, verbose=False, local_cache=local_cache) + return LindiH5ZarrStore(_file=remf, _url=hdf5_file_name_or_url, _opts=opts, _entities_to_close=[], _local_cache=local_cache) else: + if local_cache is not None: + raise Exception("local_cache cannot be used with a local file") f = open(hdf5_file_name_or_url, "rb") return LindiH5ZarrStore(_file=f, _url=url, _opts=opts, _entities_to_close=[f]) @@ -334,7 +343,24 @@ def _get_chunk_file_bytes(self, key_parent: str, key_name: str): else: assert byte_offset is not None assert byte_count is not None + if self._local_cache is not None: + assert self._url is not None, "Unexpected: url is None but local_cache is not None" + ch = self._local_cache.get_remote_chunk( + url=self._url, + offset=byte_offset, + size=byte_count + ) + if ch is not None: + return ch buf = _read_bytes(self._file, byte_offset, byte_count) + if self._local_cache is not None: + assert self._url is not None, "Unexpected: url is None but local_cache is not None" + self._local_cache.put_remote_chunk( + url=self._url, + offset=byte_offset, + size=byte_count, + data=buf + ) return buf def _get_chunk_file_bytes_data(self, key_parent: str, key_name: str): @@ -464,7 +490,7 @@ def listdir(self, path: str = "") -> List[str]: def write_reference_file_system(self, output_file_name: str): """Write a reference file system corresponding to this store to a file. - This can then be loaded using LindiH5pyFile.from_reference_file_system(file_name) + This can then be loaded using LindiH5pyFile.from_lindi_file(file_name) """ if not output_file_name.endswith(".lindi.json"): diff --git a/lindi/LindiH5pyFile/FileSegmentReader/DandiFileSegmentReader.py b/lindi/LindiH5pyFile/FileSegmentReader/DandiFileSegmentReader.py deleted file mode 100644 index 35e3b59..0000000 --- a/lindi/LindiH5pyFile/FileSegmentReader/DandiFileSegmentReader.py +++ /dev/null @@ -1,83 +0,0 @@ -from typing import Union -import os -import time -import requests -import remfile -from .FileSegmentReader import FileSegmentReader - - -class DandiFileSegmentReader(FileSegmentReader): - """ - A class that reads a segment of a file from a DANDI URL. - - See the documentation for LindiReferenceFileSystemStore for more information - on why this class is needed. - """ - def __init__(self, url: str): - # we intentionally do not call the super constructor - if not DandiFileSegmentReader.is_dandi_url(url): - raise Exception(f"{url} is not a dandi url") - self.url = url - - # The DandiFile has a get_url() method which is in charge of resolving - # the URL, possibly using the DANDI API token, caching the result, and - # renewing periodically. remfile.File will accept such an object, and - # will call .get_url() as needed. - dandi_file = DandiFile(url) - self.remfile = remfile.File(dandi_file, verbose=False) - - def read(self, offset: int, length: int): - self.remfile.seek(offset) - return self.remfile.read(length) - - @staticmethod - def is_dandi_url(url: str): - if url.startswith('https://api.dandiarchive.org/api/'): - return True - if url.startswith('https://api-staging.dandiarchive.org/'): - return True - return False - - -class DandiFile: - def __init__(self, url: str): - """ - Create a new DandiFile which is in charge of resolving a DANDI URL - possibly using the DANDI API token, caching the result, and periodically - renewing the result. - """ - self._url = url - self._resolved_url: Union[str, None] = None - self._resolved_url_timestamp: Union[float, None] = None - - def get_url(self) -> str: - if self._resolved_url is not None and self._resolved_url_timestamp is not None: - if time.time() - self._resolved_url_timestamp < 120: - return self._resolved_url - resolve_with_dandi_api_key = None - if self._url.startswith('https://api.dandiarchive.org/api/'): - dandi_api_key = os.environ.get('DANDI_API_KEY', None) - if dandi_api_key is not None: - resolve_with_dandi_api_key = dandi_api_key - elif self._url.startswith('https://api-staging.dandiarchive.org/'): - dandi_api_key = os.environ.get('DANDI_STAGING_API_KEY', None) - if dandi_api_key is not None: - resolve_with_dandi_api_key = dandi_api_key - url0 = _resolve_dandi_url(url=self._url, dandi_api_key=resolve_with_dandi_api_key) - self._resolved_url = url0 - self._resolved_url_timestamp = time.time() - return self._resolved_url - -# Example of URL resolution with DANDI API token: -# https://api.dandiarchive.org/api/dandisets/000939/versions/0.240318.1555/assets/11f512ba-5bcf-4230-a8cb-dc8d36db38cb/download/ -# resolves to pre-signed S3 URL -# https://dandiarchive.s3.amazonaws.com/blobs/a2b/94f/a2b94f91-6a75-43d8-b5db-21d89449f481?response-content-disposition=attachment%3B%20filename%3D%22sub-A3701_ses-191119_behavior%2Becephys.nwb%22&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAUBRWC5GAEKH3223E%2F20240321%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20240321T122953Z&X-Amz-Expires=3600&X-Amz-SignedHeaders=host&X-Amz-Signature=ed8cfd7a9d17a226cc18e20b5f32cfe9c467025c226f1abba236793e645dfcb0 - - -def _resolve_dandi_url(url: str, dandi_api_key: Union[str, None]) -> str: - headers = {} - if dandi_api_key is not None: - headers['Authorization'] = f'token {dandi_api_key}' - # do it synchronously here - resp = requests.head(url, allow_redirects=True, headers=headers) - return str(resp.url) diff --git a/lindi/LindiH5pyFile/FileSegmentReader/FileSegmentReader.py b/lindi/LindiH5pyFile/FileSegmentReader/FileSegmentReader.py deleted file mode 100644 index 66c826f..0000000 --- a/lindi/LindiH5pyFile/FileSegmentReader/FileSegmentReader.py +++ /dev/null @@ -1,35 +0,0 @@ -import remfile - - -class FileSegmentReader: - """ - A class that reads a segment of a file from a URL or a local path. - """ - def __init__(self, url: str): - """ - Create a new FileSegmentReader. - - Parameters - ---------- - url : str - The URL of the file to read, or a local path. - """ - self.url = url - self.remfile = None - self.local_path = None - if url.startswith("http://") or url.startswith("https://"): - # remfile does not need to be closed - self.remfile = remfile.File(url) - else: - self.local_path = url - - def read(self, offset: int, length: int): - if self.remfile is not None: - self.remfile.seek(offset) - return self.remfile.read(length) - elif self.local_path is not None: - with open(self.local_path, "rb") as f: - f.seek(offset) - return f.read(length) - else: - raise Exception("Unexpected: no remfile or local_path") diff --git a/lindi/LindiH5pyFile/LindiH5pyDataset.py b/lindi/LindiH5pyFile/LindiH5pyDataset.py index 2f97078..1007833 100644 --- a/lindi/LindiH5pyFile/LindiH5pyDataset.py +++ b/lindi/LindiH5pyFile/LindiH5pyDataset.py @@ -2,10 +2,10 @@ import numpy as np import h5py import zarr -import remfile from .LindiH5pyAttributes import LindiH5pyAttributes from .LindiH5pyReference import LindiH5pyReference +from ..LindiRemfile.LindiRemfile import LindiRemfile from ..conversion.decode_references import decode_references @@ -116,7 +116,7 @@ def dtype(self): # but validate seems to work only when I put in vlen = bytes # vlen = bytes - ret = np.dtype(str(ret), metadata={'vlen': vlen}) + ret = np.dtype(str(ret), metadata={'vlen': vlen}) # type: ignore return ret @property @@ -213,13 +213,15 @@ def _get_item_for_zarr(self, zarr_array: zarr.Array, selection: Any): # make sure selection is () if selection != (): raise TypeError(f'Cannot slice a scalar dataset with {selection}') - return zarr_array[0] + # For some reason, with the newest version of zarr (2.18.0) we need to use [:][0] rather than just [0]. + # Otherwise we get an error "ValueError: buffer source array is read-only" + return zarr_array[:][0] return decode_references(zarr_array[selection]) def _get_external_hdf5_client(self, url: str) -> h5py.File: if url not in _external_hdf5_clients: if url.startswith("http://") or url.startswith("https://"): - ff = remfile.File(url) + ff = LindiRemfile(url, local_cache=self._file._local_cache) else: ff = open(url, "rb") # this never gets closed _external_hdf5_clients[url] = h5py.File(ff, "r") diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 1e6fbe1..2c12452 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -14,9 +14,13 @@ from ..LindiStagingStore.StagingArea import StagingArea from ..LindiStagingStore.LindiStagingStore import LindiStagingStore +from ..LocalCache.LocalCache import LocalCache + +from ..LindiH5ZarrStore._util import _write_rfs_to_file + class LindiH5pyFile(h5py.File): - def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r"): + def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r", _local_cache: Union[LocalCache, None] = None): """ Do not use this constructor directly. Instead, use: from_reference_file_system, from_zarr_store, from_zarr_group, @@ -26,20 +30,56 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non self._zarr_store = _zarr_store self._mode: Literal['r', 'r+'] = _mode self._the_group = LindiH5pyGroup(_zarr_group, self) + self._local_cache = _local_cache # see comment in LindiH5pyGroup self._id = f'{id(self._zarr_group)}/' @staticmethod - def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None): + def from_lindi_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None): + """ + Create a LindiH5pyFile from a URL or path to a .lindi.json file. + + For a description of parameters, see from_reference_file_system(). + """ + return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache) + + @staticmethod + def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None): + """ + Create a LindiH5pyFile from a URL or path to an HDF5 file. + + Parameters + ---------- + url_or_path : str + The URL or path to the remote or local HDF5 file. + mode : Literal["r", "r+"], optional + The mode to open the file object in. Right now only "r" is + supported, by default "r". + local_cache : Union[LocalCache, None], optional + The local cache to use for caching data chunks, by default None. + """ + from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # avoid circular import + if mode == 'r+': + raise Exception("Opening hdf5 file in r+ mode is not supported") + zarr_store = LindiH5ZarrStore.from_file(url_or_path, local_cache=local_cache) + return LindiH5pyFile.from_zarr_store( + zarr_store=zarr_store, + mode=mode, + local_cache=local_cache + ) + + @staticmethod + def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None): """ Create a LindiH5pyFile from a reference file system. Parameters ---------- - rfs : Union[dict, str] + rfs : Union[dict, str, None] The reference file system. This can be a dictionary or a URL or path - to a .lindi.json file. + to a .lindi.json file. If None, an empty reference file system will + be created. mode : Literal["r", "r+"], optional The mode to open the file object in, by default "r". If the mode is "r", the file object will be read-only. If the mode is "r+", the @@ -48,7 +88,20 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = internal in-memory representation will be modified. Use to_reference_file_system() to export the updated reference file system to the same file or a new file. + staging_area : Union[StagingArea, None], optional + The staging area to use for writing data, preparing for upload. + This is only used in write mode, by default None. + local_cache : Union[LocalCache, None], optional + The local cache to use for caching data, by default None. """ + if rfs is None: + rfs = { + "refs": { + '.zgroup': { + 'zarr_format': 2 + } + }, + } if staging_area is not None: if mode not in ['r+']: raise Exception("Staging area cannot be used in read-only mode") @@ -61,15 +114,15 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = with open(filename, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache) else: with open(rfs, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache) elif isinstance(rfs, dict): # This store does not need to be closed - store = LindiReferenceFileSystemStore(rfs) + store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache) if staging_area: store = LindiStagingStore(base_store=store, staging_area=staging_area) return LindiH5pyFile.from_zarr_store(store, mode=mode) @@ -77,7 +130,7 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = raise Exception(f"Unhandled type for rfs: {type(rfs)}") @staticmethod - def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r"): + def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None): """ Create a LindiH5pyFile from a zarr store. @@ -94,10 +147,10 @@ def from_zarr_store(zarr_store: ZarrStore, mode: Literal["r", "r+"] = "r"): # does not need to be closed zarr_group = zarr.open(store=zarr_store, mode=mode) assert isinstance(zarr_group, zarr.Group) - return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode) + return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache) @staticmethod - def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _zarr_store: Union[ZarrStore, None] = None): + def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None): """ Create a LindiH5pyFile from a zarr group. @@ -115,27 +168,44 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: Literal["r", "r+"] = "r", _ See from_zarr_store(). """ - return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode) + return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache) def to_reference_file_system(self): """ Export the internal in-memory representation to a reference file system. In order to use this, the file object needs to have been created using - from_reference_file_system(). + from_reference_file_system() or from_lindi_file(). """ + from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # avoid circular import if self._zarr_store is None: raise Exception("Cannot convert to reference file system without zarr store") zarr_store = self._zarr_store if isinstance(zarr_store, LindiStagingStore): zarr_store = zarr_store._base_store + if isinstance(zarr_store, LindiH5ZarrStore): + return zarr_store.to_reference_file_system() if not isinstance(zarr_store, LindiReferenceFileSystemStore): - raise Exception(f"Unexpected type for zarr store: {type(self._zarr_store)}") + raise Exception(f"Cannot create reference file system when zarr store has type {type(self._zarr_store)}") rfs = zarr_store.rfs rfs_copy = json.loads(json.dumps(rfs)) LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts_in_rfs(rfs_copy) LindiReferenceFileSystemStore.use_templates_in_rfs(rfs_copy) return rfs_copy + def write_lindi_file(self, filename: str): + """ + Write the reference file system to a .lindi.json file. + + Parameters + ---------- + filename : str + The filename to write to. It must end with '.lindi.json'. + """ + if not filename.endswith(".lindi.json"): + raise Exception("Filename must end with '.lindi.json'") + rfs = self.to_reference_file_system() + _write_rfs_to_file(rfs=rfs, output_file_name=filename) + @property def attrs(self): # type: ignore return LindiH5pyAttributes(self._zarr_group.attrs, readonly=self.mode == "r") diff --git a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py index 731a549..1fad33f 100644 --- a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py +++ b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py @@ -1,9 +1,10 @@ -from typing import Literal, Dict +from typing import Literal, Dict, Union import json import base64 +import requests from zarr.storage import Store as ZarrStore -from .FileSegmentReader.FileSegmentReader import FileSegmentReader -from .FileSegmentReader.DandiFileSegmentReader import DandiFileSegmentReader + +from ..LocalCache.LocalCache import LocalCache class LindiReferenceFileSystemStore(ZarrStore): @@ -65,7 +66,7 @@ class LindiReferenceFileSystemStore(ZarrStore): will be reflected immediately in the store. This can be used by experimental tools such as lindi-cloud. """ - def __init__(self, rfs: dict, mode: Literal["r", "r+"] = "r+"): + def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None): """ Create a LindiReferenceFileSystemStore. @@ -75,6 +76,9 @@ def __init__(self, rfs: dict, mode: Literal["r", "r+"] = "r+"): The reference file system (see class docstring for details). mode : str The mode to open the store in. Only "r" is supported at this time. + local_cache : LocalCache, optional + The local cache to use for caching data chunks read from the + remote URLs. If None, no caching is done. """ if "refs" not in rfs: raise Exception("rfs must contain a 'refs' key") @@ -106,6 +110,7 @@ def __init__(self, rfs: dict, mode: Literal["r", "r+"] = "r+"): self.rfs = rfs self.mode = mode + self.local_cache = local_cache # These methods are overridden from MutableMapping def __getitem__(self, key: str): @@ -128,7 +133,13 @@ def __getitem__(self, key: str): if '{{' in url and 'templates' in self.rfs: for k, v in self.rfs["templates"].items(): url = url.replace("{{" + k + "}}", v) - val = _read_bytes_from_url(url, offset, length) + if self.local_cache is not None: + x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length) + if x is not None: + return x + val = _read_bytes_from_url_or_path(url, offset, length) + if self.local_cache is not None: + self.local_cache.put_remote_chunk(url=url, offset=offset, size=length, data=val) return val else: # should not happen given checks in __init__, but self.rfs is mutable @@ -217,22 +228,24 @@ def use_templates_in_rfs(rfs: dict) -> None: v[0] = '{{' + template_names_for_urls[url] + '}}' -# Keep a global cache of file segment readers that apply to all instances of -# LindiReferenceFileSystemStore. The key is the URL of the file. -_file_segment_readers: Dict[str, FileSegmentReader] = {} - - -def _read_bytes_from_url(url: str, offset: int, length: int): +def _read_bytes_from_url_or_path(url_or_path: str, offset: int, length: int): """ Read a range of bytes from a URL. """ - if url not in _file_segment_readers: - if DandiFileSegmentReader.is_dandi_url(url): - # This is a DANDI URL, so it needs to be handled specially - # see the docstring for DandiFileSegmentReader for details - file_segment_reader = DandiFileSegmentReader(url) - else: - # This is a non-DANDI URL or local file path - file_segment_reader = FileSegmentReader(url) - _file_segment_readers[url] = file_segment_reader - return _file_segment_readers[url].read(offset, length) + from ..LindiRemfile.LindiRemfile import _resolve_url + if url_or_path.startswith('http://') or url_or_path.startswith('https://'): + url_resolved = _resolve_url(url_or_path) # handle DANDI auth + range_start = offset + range_end = offset + length - 1 + range_header = f"bytes={range_start}-{range_end}" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + "Range": range_header + } + response = requests.get(url_resolved, headers=headers) + response.raise_for_status() + return response.content + else: + with open(url_or_path, 'rb') as f: + f.seek(offset) + return f.read(length) diff --git a/lindi/LindiRemfile/LindiRemfile.py b/lindi/LindiRemfile/LindiRemfile.py new file mode 100644 index 0000000..db3e7ec --- /dev/null +++ b/lindi/LindiRemfile/LindiRemfile.py @@ -0,0 +1,378 @@ +from typing import Union +import time +import os +import requests +from ..LocalCache.LocalCache import LocalCache + +default_min_chunk_size = 128 * 1024 # This is different from Remfile - this is an important decision because it determines the chunk size in the LocalCache +default_max_cache_size = 1024 * 1024 * 1024 +default_chunk_increment_factor = 1.7 +default_max_chunk_size = 100 * 1024 * 1024 + + +class LindiRemfile: + def __init__( + self, + url: str, + *, + verbose: bool = False, + local_cache: Union[LocalCache, None], + _min_chunk_size: int = default_min_chunk_size, # recommended not to change because it will make the accumulated cache in LocalCache useless + _max_cache_size: int = default_max_cache_size, + _chunk_increment_factor: float = default_chunk_increment_factor, + _max_chunk_size: int = default_max_cache_size, + _impose_request_failures_for_testing: bool = False, + ): + """Create a file-like object for reading a remote file. Optimized for reading hdf5 files. The arguments starting with an underscore are for testing and debugging purposes - they may experience breaking changes in the future. + + Args: + url (str): The url of the remote file, or an object with a .get_url() method. The latter is useful if the url is a presigned AWS URL that expires after a certain amount of time. + verbose (bool, optional): Whether to print info for debugging. Defaults to False. + local_cache (LocalCache, optional): The local cache for storing the chunks of the file. Defaults to None. + _min_chunk_size (int, optional): The minimum chunk size. When reading, the chunks will be loaded in multiples of this size. + _max_cache_size (int, optional): The maximum number of bytes to keep in the cache. + _chunk_increment_factor (int, optional): The factor by which to increase the number of chunks to load when the system detects that the chunks are being loaded in order. + _max_chunk_size (int, optional): The maximum chunk size. When reading, the chunks will be loaded in multiples of the minimum chunk size up to this size. + _impose_request_failures_for_testing (bool, optional): Whether to impose request failures for testing purposes. Defaults to False. + + Difference between this and regular RemFile? + Uses Lindi's LocalCache instead of Remfile's DiskCache + Requires that url is a string (does not accept object with .get_url() function) + Does not support using multiple threads + Does not use memory cache if LocalCache is specified + Handles DANDI authentication + + A note: + In the context of LINDI, this LindiRemfile is going to be used for loading + metadata of the hdf5 file. The large chunks are going to be loaded using + a different (Zarr) mechanism. That's one reason for the above differences. + """ + if not isinstance(url, str): + # Do this check, because regular Remfile allows for objects + raise Exception('Only string urls are supported for LindiRemfile') + self._url = url + self._verbose = verbose + self._local_cache = local_cache + self._min_chunk_size = _min_chunk_size + self._max_chunks_in_cache = int(_max_cache_size / _min_chunk_size) + self._chunk_increment_factor = _chunk_increment_factor + self._max_chunk_size = _max_chunk_size + self._impose_request_failures_for_testing = _impose_request_failures_for_testing + self._memory_chunks = {} + self._memory_chunk_indices: list[ + int + ] = ( + [] + ) # list of chunk indices in order of loading for purposes of cleaning up the cache + self._position = 0 + self._smart_loader_last_chunk_index_accessed = -99 + self._smart_loader_chunk_sequence_length = 1 + + # use aborted GET request rather than HEAD request to get the length + # this is needed for presigned AWS URLs because HEAD requests are not supported + response = requests.get(_resolve_url(self._url), stream=True) + if response.status_code == 200: + self.length = int(response.headers["Content-Length"]) + else: + raise Exception( + f"Error getting file length: {response.status_code} {response.reason}" + ) + # Close the connection without reading the content to avoid downloading the whole file + response.close() + + # response = requests.head(_get_url_str(self._url)) + # self.length = int(response.headers['Content-Length']) + self.session = requests.Session() + + def read(self, size=None): + """Read bytes from the file. + + Args: + size (_type_): The number of bytes to read. + + Raises: + Exception: If the size argument is not provided. + + Returns: + bytes: The bytes read. + """ + if size is None: + raise Exception( + "The size argument must be provided in remfile" + ) # pragma: no cover + + chunk_start_index = self._position // self._min_chunk_size + chunk_end_index = (self._position + size - 1) // self._min_chunk_size + loaded_chunks = {} + for chunk_index in range(chunk_start_index, chunk_end_index + 1): + loaded_chunks[chunk_index] = self._load_chunk(chunk_index) + if chunk_end_index == chunk_start_index: + chunk = loaded_chunks[chunk_start_index] + chunk_offset = self._position % self._min_chunk_size + chunk_length = size + self._position += size + return chunk[chunk_offset: chunk_offset + chunk_length] + else: + pieces_to_concat = [] + for chunk_index in range(chunk_start_index, chunk_end_index + 1): + chunk = loaded_chunks[chunk_index] + if chunk_index == chunk_start_index: + chunk_offset = self._position % self._min_chunk_size + chunk_length = self._min_chunk_size - chunk_offset + elif chunk_index == chunk_end_index: + chunk_offset = 0 + chunk_length = size - sum([len(p) for p in pieces_to_concat]) + else: + chunk_offset = 0 + chunk_length = self._min_chunk_size + pieces_to_concat.append( + chunk[chunk_offset: chunk_offset + chunk_length] + ) + ret = b"".join(pieces_to_concat) + self._position += size + + # clean up the cache + if len(self._memory_chunk_indices) > self._max_chunks_in_cache: + if self._verbose: + print("Cleaning up cache") + for chunk_index in self._memory_chunk_indices[ + : int(self._max_chunks_in_cache * 0.5) + ]: + if chunk_index in self._memory_chunks: + del self._memory_chunks[chunk_index] + else: + # it is possible that the chunk was already deleted (repeated chunk index in the list) + pass + self._memory_chunk_indices = self._memory_chunk_indices[ + int(self._max_chunks_in_cache * 0.5): + ] + + return ret + + def _load_chunk(self, chunk_index: int) -> bytes: + """Load a chunk of the file. + + Args: + chunk_index (int): The index of the chunk to load. + """ + if chunk_index in self._memory_chunks: + self._smart_loader_last_chunk_index_accessed = chunk_index + return self._memory_chunks[chunk_index] + + if self._local_cache is not None: + cached_value = self._local_cache.get_remote_chunk( + url=self._url, + offset=chunk_index * self._min_chunk_size, + size=min(self._min_chunk_size, self.length - chunk_index * self._min_chunk_size), + ) + if cached_value is not None: + if self._local_cache is None: + self._memory_chunks[chunk_index] = cached_value + self._memory_chunk_indices.append(chunk_index) + self._smart_loader_last_chunk_index_accessed = chunk_index + return cached_value + + if chunk_index == self._smart_loader_last_chunk_index_accessed + 1: + # round up to the chunk sequence length times 1.7 + self._smart_loader_chunk_sequence_length = round( + self._smart_loader_chunk_sequence_length * 1.7 + 0.5 + ) + if ( + self._smart_loader_chunk_sequence_length > self._max_chunk_size / self._min_chunk_size + ): + self._smart_loader_chunk_sequence_length = int( + self._max_chunk_size / self._min_chunk_size + ) + # make sure the chunk sequence length is valid + for j in range(1, self._smart_loader_chunk_sequence_length): + if chunk_index + j in self._memory_chunks: + # already loaded this chunk + self._smart_loader_chunk_sequence_length = j + break + else: + self._smart_loader_chunk_sequence_length = round( + self._smart_loader_chunk_sequence_length / 1.7 + 0.5 + ) + data_start = chunk_index * self._min_chunk_size + data_end = ( + data_start + self._min_chunk_size * self._smart_loader_chunk_sequence_length - 1 + ) + if self._verbose: + print( + f"Loading {self._smart_loader_chunk_sequence_length} chunks starting at {chunk_index} ({(data_end - data_start + 1)/1e6} million bytes)" + ) + if data_end >= self.length: + data_end = self.length - 1 + x = _get_bytes( + self.session, + _resolve_url(self._url), + data_start, + data_end, + verbose=self._verbose, + _impose_request_failures_for_testing=self._impose_request_failures_for_testing, + ) + if not x: + raise Exception(f'Error loading chunk {chunk_index} from {self._url}') + if self._smart_loader_chunk_sequence_length == 1: + if self._local_cache is None: + self._memory_chunks[chunk_index] = x + if self._local_cache is not None: + self._local_cache.put_remote_chunk( + url=self._url, + offset=chunk_index * self._min_chunk_size, + size=min(self._min_chunk_size, self.length - chunk_index * self._min_chunk_size), + data=x + ) + self._memory_chunk_indices.append(chunk_index) + else: + for i in range(self._smart_loader_chunk_sequence_length): + if i * self._min_chunk_size >= len(x): + break + if self._local_cache is None: + self._memory_chunks[chunk_index + i] = x[ + i * self._min_chunk_size: (i + 1) * self._min_chunk_size + ] + self._memory_chunk_indices.append(chunk_index + i) + if self._local_cache is not None: + size = min(self._min_chunk_size, self.length - (chunk_index + i) * self._min_chunk_size) + data = x[i * self._min_chunk_size: (i + 1) * self._min_chunk_size] + if len(data) != size: + raise ValueError(f'Unexpected: len(data) != size: {len(data)} != {size}') + self._local_cache.put_remote_chunk( + url=self._url, + offset=(chunk_index + i) * self._min_chunk_size, + size=size, + data=data + ) + self._smart_loader_last_chunk_index_accessed = ( + chunk_index + self._smart_loader_chunk_sequence_length - 1 + ) + return x[: self._min_chunk_size] + + def seek(self, offset: int, whence: int = 0): + """Seek to a position in the file. + + Args: + offset (int): The offset to seek to. + whence (int, optional): The code for the reference point for the offset. Defaults to 0. + + Raises: + ValueError: If the whence argument is not 0, 1, or 2. + """ + if whence == 0: + self._position = offset + elif whence == 1: + self._position += offset # pragma: no cover + elif whence == 2: + self._position = self.length + offset + else: + raise ValueError( + "Invalid argument: 'whence' must be 0, 1, or 2." + ) # pragma: no cover + + def tell(self): + return self._position + + def close(self): + pass + + +_num_request_retries = 8 + + +def _get_bytes( + session: requests.Session, + url: str, + start_byte: int, + end_byte: int, + *, + verbose=False, + _impose_request_failures_for_testing=False, +): + """Get bytes from a remote file. + + Args: + url (str): The url of the remote file. + start_byte (int): The first byte to get. + end_byte (int): The last byte to get. + verbose (bool, optional): Whether to print info for debugging. Defaults to False. + + Returns: + _type_: _description_ + """ + # Function to be used in threads for fetching the byte ranges + def fetch_bytes(range_start: int, range_end: int, num_retries: int, verbose: bool): + """Fetch a range of bytes from a remote file using the range header + + Args: + range_start (int): The first byte to get. + range_end (int): The last byte to get. + num_retries (int): The number of retries. + + Returns: + bytes: The bytes fetched. + """ + for try_num in range(num_retries + 1): + try: + actual_url = url + if _impose_request_failures_for_testing: + if try_num == 0: + actual_url = "_error_" + url + range_header = f"bytes={range_start}-{range_end}" + # response = requests.get(actual_url, headers={'Range': range_header}) + # use session to avoid creating a new connection each time + response = session.get(actual_url, headers={"Range": range_header}) + return response.content + except Exception as e: + if try_num == num_retries: + raise e # pragma: no cover + else: + delay = 0.1 * 2**try_num + if verbose: + print(f"Retrying after exception: {e}") + print(f"Waiting {delay} seconds") + time.sleep(delay) + + return fetch_bytes(start_byte, end_byte, _num_request_retries, verbose) + + +_global_resolved_urls = {} # url -> {timestamp, url} + + +def _is_dandi_url(url: str): + if url.startswith('https://api.dandiarchive.org/api/'): + return True + if url.startswith('https://api-staging.dandiarchive.org/'): + return True + return False + + +def _resolve_dandi_url(url: str): + resolve_with_dandi_api_key = None + if url.startswith('https://api.dandiarchive.org/api/'): + dandi_api_key = os.environ.get('DANDI_API_KEY', None) + if dandi_api_key is not None: + resolve_with_dandi_api_key = dandi_api_key + elif url.startswith('https://api-staging.dandiarchive.org/'): + dandi_api_key = os.environ.get('DANDI_STAGING_API_KEY', None) + if dandi_api_key is not None: + resolve_with_dandi_api_key = dandi_api_key + headers = {} + if resolve_with_dandi_api_key is not None: + headers['Authorization'] = f'token {resolve_with_dandi_api_key}' + # do it synchronously here + resp = requests.head(url, allow_redirects=True, headers=headers) + return str(resp.url) + + +def _resolve_url(url: str): + if url in _global_resolved_urls: + elapsed = time.time() - _global_resolved_urls[url]["timestamp"] + if elapsed < 60 * 10: + return _global_resolved_urls[url]["url"] + if _is_dandi_url(url): + resolved_url = _resolve_dandi_url(url) + else: + resolved_url = url + _global_resolved_urls[url] = {"timestamp": time.time(), "url": resolved_url} + return resolved_url diff --git a/lindi/LindiH5pyFile/FileSegmentReader/__init__.py b/lindi/LindiRemfile/__init__.py similarity index 100% rename from lindi/LindiH5pyFile/FileSegmentReader/__init__.py rename to lindi/LindiRemfile/__init__.py diff --git a/lindi/LocalCache/LocalCache.py b/lindi/LocalCache/LocalCache.py new file mode 100644 index 0000000..a244eb7 --- /dev/null +++ b/lindi/LocalCache/LocalCache.py @@ -0,0 +1,67 @@ +from typing import Union +import os + + +class LocalCache: + def __init__(self, *, cache_dir: Union[str, None] = None): + if cache_dir is None: + # use ~/.lindi/cache as default cache directory + cache_dir = os.path.expanduser("~/.lindi/cache") + self._cache_dir = cache_dir + os.makedirs(self._cache_dir, exist_ok=True) + self._sqlite_db_fname = os.path.join(self._cache_dir, "lindi_cache.db") + self._sqlite_client = LocalCacheSQLiteClient(db_fname=self._sqlite_db_fname) + + def get_remote_chunk(self, *, url: str, offset: int, size: int): + return self._sqlite_client.get_remote_chunk(url=url, offset=offset, size=size) + + def put_remote_chunk(self, *, url: str, offset: int, size: int, data: bytes): + if len(data) != size: + raise ValueError("data size does not match size") + self._sqlite_client.put_remote_chunk(url=url, offset=offset, size=size, data=data) + + +class LocalCacheSQLiteClient: + def __init__(self, *, db_fname: str): + import sqlite3 + self._db_fname = db_fname + self._conn = sqlite3.connect(self._db_fname) + self._cursor = self._conn.cursor() + self._cursor.execute( + """ + PRAGMA journal_mode=WAL + """ + ) + self._cursor.execute( + """ + CREATE TABLE IF NOT EXISTS remote_chunks ( + url TEXT, + offset INTEGER, + size INTEGER, + data BLOB, + PRIMARY KEY (url, offset, size) + ) + """ + ) + self._conn.commit() + + def get_remote_chunk(self, *, url: str, offset: int, size: int): + self._cursor.execute( + """ + SELECT data FROM remote_chunks WHERE url = ? AND offset = ? AND size = ? + """, + (url, offset, size), + ) + row = self._cursor.fetchone() + if row is None: + return None + return row[0] + + def put_remote_chunk(self, *, url: str, offset: int, size: int, data: bytes): + self._cursor.execute( + """ + INSERT OR REPLACE INTO remote_chunks (url, offset, size, data) VALUES (?, ?, ?, ?) + """, + (url, offset, size, data), + ) + self._conn.commit() diff --git a/lindi/LocalCache/__init__.py b/lindi/LocalCache/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lindi/__init__.py b/lindi/__init__.py index 794e1cc..f50d436 100644 --- a/lindi/__init__.py +++ b/lindi/__init__.py @@ -1,3 +1,4 @@ from .LindiH5ZarrStore import LindiH5ZarrStore, LindiH5ZarrStoreOpts # noqa: F401 from .LindiH5pyFile import LindiH5pyFile, LindiH5pyGroup, LindiH5pyDataset, LindiH5pyHardLink, LindiH5pySoftLink # noqa: F401 from .LindiStagingStore import LindiStagingStore, StagingArea # noqa: F401 +from .LocalCache.LocalCache import LocalCache # noqa: F401 diff --git a/pyproject.toml b/pyproject.toml index 2a88f0f..842e667 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,6 @@ python = "^3.8" numcodecs = "^0.12.1" zarr = "^2.16.1" h5py = "^3.10.0" -remfile = "^0.1.9" requests = "^2.31.0" [tool.poetry.group.dev.dependencies] diff --git a/tests/test_local_cache.py b/tests/test_local_cache.py new file mode 100644 index 0000000..4f7644e --- /dev/null +++ b/tests/test_local_cache.py @@ -0,0 +1,49 @@ +import tempfile +import json +import time +import pytest +import lindi + + +@pytest.mark.network +def test_remote_data_1(): + with tempfile.TemporaryDirectory() as tmpdir: + local_cache = lindi.LocalCache(cache_dir=tmpdir + '/local_cache') + for passnum in range(2): + import pynwb + + # Define the URL for a remote NWB file + h5_url = "https://api.dandiarchive.org/api/assets/11f512ba-5bcf-4230-a8cb-dc8d36db38cb/download/" + + # Create a read-only Zarr store as a wrapper for the h5 file + store = lindi.LindiH5ZarrStore.from_file(h5_url) + + # Generate a reference file system + rfs = store.to_reference_file_system() + + # Save it to a file for later use + with open("example.nwb.lindi.json", "w") as f: + json.dump(rfs, f, indent=2) + + # Create an h5py-like client from the reference file system + client = lindi.LindiH5pyFile.from_reference_file_system(rfs, local_cache=local_cache) + + # Open using pynwb + timer = time.time() + with pynwb.NWBHDF5IO(file=client, mode="r") as io: + nwbfile = io.read() + print(nwbfile) + x = client["/processing/ecephys/LFP/LFP/data"] + assert isinstance(x, lindi.LindiH5pyDataset) + x[:1000] + elapsed = time.time() - timer + print('Elapsed time:', elapsed) + if passnum == 0: + elapsed_0 = elapsed + if passnum == 1: + elapsed_1 = elapsed + assert elapsed_1 < elapsed_0 * 0.3 # type: ignore + + +if __name__ == "__main__": + test_remote_data_1() diff --git a/tests/test_staging_area.py b/tests/test_staging_area.py index e79f58b..c2e2847 100644 --- a/tests/test_staging_area.py +++ b/tests/test_staging_area.py @@ -8,8 +8,7 @@ def test_staging_area(): with tempfile.TemporaryDirectory() as tmpdir: staging_area = lindi.StagingArea.create(tmpdir + '/staging_area') - empty_rfs = {'refs': {'.zgroup': {'zarr_format': 2}}} - client = lindi.LindiH5pyFile.from_reference_file_system(empty_rfs, mode='r+', staging_area=staging_area) + client = lindi.LindiH5pyFile.from_reference_file_system(None, mode='r+', staging_area=staging_area) X = np.random.randn(1000, 1000).astype(np.float32) client.create_dataset('large_array', data=X, chunks=(400, 400)) total_size = _get_total_size_of_directory(tmpdir) @@ -43,7 +42,7 @@ def on_upload_main(fname: str): consolidate_chunks=True ) - client3 = lindi.LindiH5pyFile.from_reference_file_system(output_fname, mode='r') + client3 = lindi.LindiH5pyFile.from_lindi_file(output_fname, mode='r') X3 = client3['large_array'] assert isinstance(X3, lindi.LindiH5pyDataset) assert np.allclose(X1[:], X3[:])