Skip to content

Commit

Permalink
single file binary lindi format
Browse files Browse the repository at this point in the history
  • Loading branch information
magland committed Aug 1, 2024
1 parent 3b9f2e9 commit e7345bd
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 60 deletions.
20 changes: 20 additions & 0 deletions examples/write_lindi_binary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import lindi


def write_lindi_binary():
with lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='w', create_binary=True) as f:
f.attrs['test'] = 42
ds = f.create_dataset('data', shape=(1000, 1000), dtype='f4')
ds[...] = 42


def test_read():
f = lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='r')
print(f.attrs['test'])
print(f['data'][0, 0])
f.close()


if __name__ == "__main__":
write_lindi_binary()
test_read()
159 changes: 100 additions & 59 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import json
import tempfile
import urllib.request
import h5py
import zarr
from zarr.storage import Store as ZarrStore
Expand All @@ -15,11 +14,14 @@
from ..LindiStagingStore.StagingArea import StagingArea
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore, _apply_templates
from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts
from ..lindi1 import Lindi1Store

from ..LocalCache.LocalCache import LocalCache

from ..LindiH5ZarrStore._util import _write_rfs_to_file

from ..lindi1._lindi1 import _load_rfs_dict_from_local_lindi_file, _load_rfs_dict_from_remote_lindi_file, _write_rfs_dict_to_lindi1_file, _create_empty_lindi_file


LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"]

Expand All @@ -29,7 +31,7 @@


class LindiH5pyFile(h5py.File):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _local_file_path: Union[str, None] = None):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None):
"""
Do not use this constructor directly. Instead, use: from_lindi_file,
from_h5py_file, from_reference_file_system, from_zarr_store, or
Expand All @@ -40,22 +42,26 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non
self._mode: LindiFileMode = _mode
self._the_group = LindiH5pyGroup(_zarr_group, self)
self._local_cache = _local_cache
self._local_file_path = _local_file_path
self._source_url_or_path = _source_url_or_path
self._source_is_lindi1_format = _source_is_lindi1_format

# see comment in LindiH5pyGroup
self._id = f'{id(self._zarr_group)}/'

@staticmethod
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, create_binary: bool = False):
"""
Create a LindiH5pyFile from a URL or path to a .lindi.json file.
For a description of parameters, see from_reference_file_system().
"""
if local_file_path is None:
if not url_or_path.startswith("http://") and not url_or_path.startswith("https://"):
local_file_path = url_or_path
return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
return LindiH5pyFile.from_reference_file_system(
url_or_path,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
create_binary=create_binary
)

@staticmethod
def from_hdf5_file(
Expand Down Expand Up @@ -95,11 +101,13 @@ def from_hdf5_file(
return LindiH5pyFile.from_zarr_store(
zarr_store=zarr_store,
mode=mode,
local_cache=local_cache
local_cache=local_cache,
_source_url_or_path=None,
_source_is_lindi1_format=None
)

@staticmethod
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None, create_binary: bool = False):
"""
Create a LindiH5pyFile from a reference file system.
Expand All @@ -116,11 +124,11 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
is only used in write mode, by default None.
local_cache : Union[LocalCache, None], optional
The local cache to use for caching data, by default None.
local_file_path : Union[str, None], optional
If rfs is not a string or is a remote url, this is the path to the
local file for the purpose of writing to it. It is required in this
case if mode is not "r". If rfs is a string and not a remote url, it
must be equal to local_file_path if provided.
_source_url_or_path : Union[str, None], optional
_source_is_lindi1_format : Union[bool, None], optional create_binary :
bool, optional
Only applies when writing a new file. If True, a binary lindi file
will be created.
"""
if rfs is None:
rfs = {
Expand All @@ -131,26 +139,32 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
},
}

if create_binary:
if mode not in ["w", "w-", "x"]:
raise Exception("create_binary is only supported in write mode")

if isinstance(rfs, str):
rfs_is_url = rfs.startswith("http://") or rfs.startswith("https://")
if local_file_path is not None and not rfs_is_url and rfs != local_file_path:
raise Exception(f"rfs is not a remote url, so local_file_path must be the same as rfs, but got: {rfs} and {local_file_path}")
if rfs_is_url:
with tempfile.TemporaryDirectory() as tmpdir:
filename = f"{tmpdir}/temp.lindi.json"
_download_file(rfs, filename)
with open(filename, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
if create_binary:
raise Exception("create_binary is not supported with remote files")
data, is_lindi1_format = _load_rfs_dict_from_remote_lindi_file(rfs)
assert isinstance(data, dict) # prevent infinite recursion
if _source_url_or_path is not None:
if _source_url_or_path != rfs:
raise Exception(f"source_url_or_path must match rfs if rfs is a remote file, got: {rfs}, {_source_url_or_path}")
if _source_is_lindi1_format is not None:
if _source_is_lindi1_format != is_lindi1_format:
raise Exception(f"source_is_lindi1_format must match is_lindi1_format if rfs is a remote file, got: {is_lindi1_format}, {_source_is_lindi1_format}")
return LindiH5pyFile.from_reference_file_system(
data,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_url_or_path=rfs,
_source_is_lindi1_format=is_lindi1_format
)
else:
empty_rfs = {
"refs": {
'.zgroup': {
'zarr_format': 2
}
},
}
if mode == "r":
# Readonly, file must exist (default)
if not os.path.exists(rfs):
Expand All @@ -161,36 +175,59 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
raise Exception(f"File does not exist: {rfs}")
elif mode == "w":
# Create file, truncate if exists
with open(rfs, "w") as f:
json.dump(empty_rfs, f)
_create_empty_lindi_file(rfs, create_binary=create_binary)
elif mode in ["w-", "x"]:
# Create file, fail if exists
if os.path.exists(rfs):
raise Exception(f"File already exists: {rfs}")
with open(rfs, "w") as f:
json.dump(empty_rfs, f)
_create_empty_lindi_file(rfs, create_binary=create_binary)
elif mode == "a":
# Read/write if exists, create otherwise
if os.path.exists(rfs):
with open(rfs, "r") as f:
data = json.load(f)
else:
_create_empty_lindi_file(rfs, create_binary=create_binary)
else:
raise Exception(f"Unhandled mode: {mode}")
with open(rfs, "r") as f:
data = json.load(f)
data, is_lindi1_format = _load_rfs_dict_from_local_lindi_file(rfs)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
if _source_url_or_path is not None:
if _source_url_or_path != rfs:
raise Exception(f"source_url_or_path must match rfs if rfs is a local file, got: {rfs}, {_source_url_or_path}")
if _source_is_lindi1_format is not None:
if _source_is_lindi1_format != is_lindi1_format:
raise Exception(f"source_is_lindi1_format must match is_lindi1_format if rfs is a local file, got: {is_lindi1_format}, {_source_is_lindi1_format}")
return LindiH5pyFile.from_reference_file_system(
data, mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_url_or_path=rfs,
_source_is_lindi1_format=is_lindi1_format
)
elif isinstance(rfs, dict):
# This store does not need to be closed
store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache)
store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache, _source_url_or_path=_source_url_or_path)
if staging_area:
if _source_is_lindi1_format:
raise Exception("Staging area is not supported with lindi1 format")
store = LindiStagingStore(base_store=store, staging_area=staging_area)
return LindiH5pyFile.from_zarr_store(store, mode=mode, local_file_path=local_file_path, local_cache=local_cache)
elif _source_is_lindi1_format and _source_url_or_path is not None:
is_url = _source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://")
if not is_url:
store = Lindi1Store(base_store=store, lindi1_file_name=_source_url_or_path)
return LindiH5pyFile.from_zarr_store(
store,
mode=mode,
local_cache=local_cache,
_source_url_or_path=_source_url_or_path,
_source_is_lindi1_format=_source_is_lindi1_format
)
else:
raise Exception(f"Unhandled type for rfs: {type(rfs)}")

@staticmethod
def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None):
"""
Create a LindiH5pyFile from a zarr store.
Expand All @@ -207,10 +244,17 @@ def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cach
# does not need to be closed
zarr_group = zarr.open(store=zarr_store, mode=mode)
assert isinstance(zarr_group, zarr.Group)
return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, local_file_path=local_file_path)
return LindiH5pyFile.from_zarr_group(
zarr_group,
_zarr_store=zarr_store,
mode=mode,
local_cache=local_cache,
_source_url_or_path=_source_url_or_path,
_source_is_lindi1_format=_source_is_lindi1_format
)

@staticmethod
def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_is_lindi1_format: Union[bool, None] = None):
"""
Create a LindiH5pyFile from a zarr group.
Expand All @@ -228,7 +272,7 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_
See from_zarr_store().
"""
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _local_file_path=local_file_path)
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_is_lindi1_format=_source_is_lindi1_format)

def to_reference_file_system(self):
"""
Expand All @@ -241,6 +285,8 @@ def to_reference_file_system(self):
if isinstance(zarr_store, LindiStagingStore):
zarr_store.consolidate_chunks()
zarr_store = zarr_store._base_store
if isinstance(zarr_store, Lindi1Store):
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiH5ZarrStore):
return zarr_store.to_reference_file_system()
if not isinstance(zarr_store, LindiReferenceFileSystemStore):
Expand Down Expand Up @@ -284,8 +330,8 @@ def upload(
if isinstance(v, list) and len(v) == 3:
url = _apply_templates(v[0], rfs.get('templates', {}))
if not url.startswith("http://") and not url.startswith("https://"):
local_path = url
blobs_to_upload.add(local_path)
local_path_for_blob = url
blobs_to_upload.add(local_path_for_blob)
# Upload each of the local blobs using the given upload function and get a mapping from
# the original file paths to the URLs of the uploaded files
blob_mapping = _upload_blobs(blobs_to_upload, on_upload_blob=on_upload_blob)
Expand Down Expand Up @@ -358,9 +404,14 @@ def close(self):
self.flush()

def flush(self):
if self._mode != 'r' and self._local_file_path is not None:
rfs = self.to_reference_file_system()
_write_rfs_to_file(rfs=rfs, output_file_name=self._local_file_path)
if self._mode != 'r' and self._source_url_or_path is not None:
is_url = self._source_url_or_path.startswith("http://") or self._source_url_or_path.startswith("https://")
if not is_url:
rfs = self.to_reference_file_system()
if self._source_is_lindi1_format:
_write_rfs_dict_to_lindi1_file(rfs=rfs, output_file_name=self._source_url_or_path)
else:
_write_rfs_to_file(rfs=rfs, output_file_name=self._source_url_or_path)

def __enter__(self): # type: ignore
return self
Expand Down Expand Up @@ -512,16 +563,6 @@ def staging_store(self):
return store


def _download_file(url: str, filename: str) -> None:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as response:
with open(filename, "wb") as f:
f.write(response.read())


def _recursive_copy(src_item: Union[h5py.Group, h5py.Dataset], dest: h5py.File, name: str) -> None:
if isinstance(src_item, h5py.Group):
dst_item = dest.create_group(name)
Expand Down
6 changes: 5 additions & 1 deletion lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class LindiReferenceFileSystemStore(ZarrStore):
It is okay for rfs to be modified outside of this class, and the changes
will be reflected immediately in the store.
"""
def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None):
def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None):
"""
Create a LindiReferenceFileSystemStore.
Expand Down Expand Up @@ -114,6 +114,7 @@ def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: U
self.rfs = rfs
self.mode = mode
self.local_cache = local_cache
self._source_url_or_path = _source_url_or_path

# These methods are overridden from MutableMapping
def __contains__(self, key: object):
Expand Down Expand Up @@ -155,6 +156,9 @@ def _get_helper(self, key: str):
x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length)
if x is not None:
return x
if url == '.' and self._source_url_or_path:
# this is where the file refers to bytes in the same file (lindi binary format)
url = self._source_url_or_path
val = _read_bytes_from_url_or_path(url, offset, length)
if self.local_cache is not None:
try:
Expand Down
Loading

0 comments on commit e7345bd

Please sign in to comment.