Skip to content

Commit

Permalink
lindi tar format
Browse files Browse the repository at this point in the history
  • Loading branch information
magland committed Aug 2, 2024
1 parent 3b9f2e9 commit 34a21f9
Show file tree
Hide file tree
Showing 6 changed files with 609 additions and 61 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.lindi
*.lindi.json*
*.nwb

Expand Down
21 changes: 21 additions & 0 deletions examples/write_lindi_binary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import numpy as np
import lindi


def write_lindi_binary():
with lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='w') as f:
f.attrs['test'] = 42
ds = f.create_dataset('data', shape=(1000, 1000), dtype='f4')
ds[...] = np.random.rand(1000, 1000)


def test_read():
f = lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='r')
print(f.attrs['test'])
print(f['data'][0, 0])
f.close()


if __name__ == "__main__":
write_lindi_binary()
test_read()
231 changes: 184 additions & 47 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

from ..LindiH5ZarrStore._util import _write_rfs_to_file

from ..tar.lindi_tar import LindiTarFile
from ..tar.LindiTarStore import LindiTarStore


LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"]

Expand All @@ -29,7 +32,7 @@


class LindiH5pyFile(h5py.File):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _local_file_path: Union[str, None] = None):
def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
"""
Do not use this constructor directly. Instead, use: from_lindi_file,
from_h5py_file, from_reference_file_system, from_zarr_store, or
Expand All @@ -40,22 +43,25 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non
self._mode: LindiFileMode = _mode
self._the_group = LindiH5pyGroup(_zarr_group, self)
self._local_cache = _local_cache
self._local_file_path = _local_file_path
self._source_url_or_path = _source_url_or_path
self._source_tar_file = _source_tar_file

# see comment in LindiH5pyGroup
self._id = f'{id(self._zarr_group)}/'

@staticmethod
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None):
"""
Create a LindiH5pyFile from a URL or path to a .lindi.json file.
For a description of parameters, see from_reference_file_system().
"""
if local_file_path is None:
if not url_or_path.startswith("http://") and not url_or_path.startswith("https://"):
local_file_path = url_or_path
return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
return LindiH5pyFile.from_reference_file_system(
url_or_path,
mode=mode,
staging_area=staging_area,
local_cache=local_cache
)

@staticmethod
def from_hdf5_file(
Expand Down Expand Up @@ -99,7 +105,7 @@ def from_hdf5_file(
)

@staticmethod
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
"""
Create a LindiH5pyFile from a reference file system.
Expand All @@ -116,11 +122,10 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
is only used in write mode, by default None.
local_cache : Union[LocalCache, None], optional
The local cache to use for caching data, by default None.
local_file_path : Union[str, None], optional
If rfs is not a string or is a remote url, this is the path to the
local file for the purpose of writing to it. It is required in this
case if mode is not "r". If rfs is a string and not a remote url, it
must be equal to local_file_path if provided.
_source_url_or_path : Union[str, None], optional
Internal use only
_source_tar_file : Union[LindiTarFile, None], optional
Internal use only
"""
if rfs is None:
rfs = {
Expand All @@ -132,25 +137,23 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
}

if isinstance(rfs, str):
if _source_url_or_path is not None:
raise Exception("_source_file_path is not None even though rfs is a string")
if _source_tar_file is not None:
raise Exception("_source_tar_file is not None even though rfs is a string")
rfs_is_url = rfs.startswith("http://") or rfs.startswith("https://")
if local_file_path is not None and not rfs_is_url and rfs != local_file_path:
raise Exception(f"rfs is not a remote url, so local_file_path must be the same as rfs, but got: {rfs} and {local_file_path}")
if rfs_is_url:
with tempfile.TemporaryDirectory() as tmpdir:
filename = f"{tmpdir}/temp.lindi.json"
_download_file(rfs, filename)
with open(filename, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
data, tar_file = _load_rfs_from_url(rfs)
return LindiH5pyFile.from_reference_file_system(
data,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_tar_file=tar_file
)
else:
empty_rfs = {
"refs": {
'.zgroup': {
'zarr_format': 2
}
},
}
# local file
need_to_create_empty_file = False
if mode == "r":
# Readonly, file must exist (default)
if not os.path.exists(rfs):
Expand All @@ -161,36 +164,59 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo
raise Exception(f"File does not exist: {rfs}")
elif mode == "w":
# Create file, truncate if exists
with open(rfs, "w") as f:
json.dump(empty_rfs, f)
need_to_create_empty_file = True

elif mode in ["w-", "x"]:
# Create file, fail if exists
if os.path.exists(rfs):
raise Exception(f"File already exists: {rfs}")
with open(rfs, "w") as f:
json.dump(empty_rfs, f)
need_to_create_empty_file = True
elif mode == "a":
# Read/write if exists, create otherwise
if os.path.exists(rfs):
with open(rfs, "r") as f:
data = json.load(f)
if not os.path.exists(rfs):
need_to_create_empty_file = True
else:
raise Exception(f"Unhandled mode: {mode}")
with open(rfs, "r") as f:
data = json.load(f)
if need_to_create_empty_file:
tar = rfs.endswith(".tar") or rfs.endswith(".lindi")
_create_empty_lindi_file(rfs, tar=tar)
data, tar_file = _load_rfs_from_local_file(rfs)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path)
return LindiH5pyFile.from_reference_file_system(
data,
mode=mode,
staging_area=staging_area,
local_cache=local_cache,
_source_url_or_path=rfs,
_source_tar_file=tar_file
)
elif isinstance(rfs, dict):
# This store does not need to be closed
store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache)
store = LindiReferenceFileSystemStore(
rfs,
local_cache=local_cache,
_source_url_or_path=_source_url_or_path,
_source_tar_file=_source_tar_file
)
source_is_url = _source_url_or_path is not None and (_source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://"))
if staging_area:
if _source_tar_file and not source_is_url:
raise Exception("Cannot use staging area when source is a local tar file")
store = LindiStagingStore(base_store=store, staging_area=staging_area)
return LindiH5pyFile.from_zarr_store(store, mode=mode, local_file_path=local_file_path, local_cache=local_cache)
elif _source_url_or_path and _source_tar_file and not source_is_url:
store = LindiTarStore(base_store=store, tar_file=_source_tar_file)
return LindiH5pyFile.from_zarr_store(
store,
mode=mode,
local_cache=local_cache,
_source_url_or_path=_source_url_or_path,
_source_tar_file=_source_tar_file
)
else:
raise Exception(f"Unhandled type for rfs: {type(rfs)}")

@staticmethod
def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
"""
Create a LindiH5pyFile from a zarr store.
Expand All @@ -207,10 +233,10 @@ def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cach
# does not need to be closed
zarr_group = zarr.open(store=zarr_store, mode=mode)
assert isinstance(zarr_group, zarr.Group)
return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, local_file_path=local_file_path)
return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file)

@staticmethod
def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None):
def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None):
"""
Create a LindiH5pyFile from a zarr group.
Expand All @@ -228,7 +254,7 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_
See from_zarr_store().
"""
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _local_file_path=local_file_path)
return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file)

def to_reference_file_system(self):
"""
Expand All @@ -241,6 +267,8 @@ def to_reference_file_system(self):
if isinstance(zarr_store, LindiStagingStore):
zarr_store.consolidate_chunks()
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiTarStore):
zarr_store = zarr_store._base_store
if isinstance(zarr_store, LindiH5ZarrStore):
return zarr_store.to_reference_file_system()
if not isinstance(zarr_store, LindiReferenceFileSystemStore):
Expand Down Expand Up @@ -358,9 +386,17 @@ def close(self):
self.flush()

def flush(self):
if self._mode != 'r' and self._local_file_path is not None:
if self._mode != 'r' and self._source_url_or_path is not None:
is_url = self._source_url_or_path.startswith("http://") or self._source_url_or_path.startswith("https://")
if is_url:
raise Exception("Cannot write to URL")
if self._source_tar_file is None:
raise Exception("_source_tar_file is None even though _source_url_or_path is not None")
rfs = self.to_reference_file_system()
_write_rfs_to_file(rfs=rfs, output_file_name=self._local_file_path)
if self._source_tar_file:
self._source_tar_file.write_rfs(rfs)
else:
_write_rfs_to_file(rfs=rfs, output_file_name=self._source_url_or_path)

def __enter__(self): # type: ignore
return self
Expand Down Expand Up @@ -611,3 +647,104 @@ def _format_size_bytes(size_bytes: int) -> str:
return f"{size_bytes / 1024 / 1024:.1f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB"


def _load_rfs_from_url(url: str):
file_size = _get_file_size_of_remote_file(url)
if file_size < 1024 * 1024 * 2:
# if it's a small file, we'll just download the whole thing
with tempfile.TemporaryDirectory() as tmpdir:
tmp_fname = f"{tmpdir}/temp.lindi.json"
_download_file(url, tmp_fname)
data, tar_file = _load_rfs_from_local_file(tmp_fname)
return data, tar_file
else:
# if it's a large file, we start by downloading the entry file and then the index file
tar_entry_buf = _download_file_byte_range(url, 0, 512)
is_tar = _check_is_tar_header(tar_entry_buf[:512])
if is_tar:
tar_file = LindiTarFile(url)
rfs_json = tar_file.read_file("lindi.json")
rfs = json.loads(rfs_json)
return rfs, tar_file
else:
# In this case, it must be a regular json file
with tempfile.TemporaryDirectory() as tmpdir:
tmp_fname = f"{tmpdir}/temp.lindi.json"
_download_file(url, tmp_fname)
with open(tmp_fname, "r") as f:
return json.load(f), None


def _load_rfs_from_local_file(fname: str):
file_size = os.path.getsize(fname)
if file_size >= 512:
# Read first bytes to check if it's a tar file
with open(fname, "rb") as f:
tar_entry_buf = f.read(512)
is_tar = _check_is_tar_header(tar_entry_buf)
if is_tar:
tar_file = LindiTarFile(fname)
rfs_json = tar_file.read_file("lindi.json")
rfs = json.loads(rfs_json)
return rfs, tar_file

# Must be a regular json file
with open(fname, "r") as f:
return json.load(f), None


def _check_is_tar_header(header_buf: bytes) -> bool:
if len(header_buf) < 512:
return False

# We're only going to support ustar format
# get the ustar indicator at bytes 257-262
if header_buf[257:262] == b"ustar" and header_buf[262] == 0:
# Note that it's unlikely but possible that a json file could have the
# string "ustar" at these bytes, but it would not have a null byte at
# byte 262
return True

# Check for any 0 bytes in the header
if b"\0" in header_buf:
print(header_buf[257:262])
raise Exception("Problem with lindi file: 0 byte found in header, but not ustar tar format")

return False


def _get_file_size_of_remote_file(url: str) -> int:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as response:
return int(response.headers['Content-Length'])


def _download_file_byte_range(url: str, start: int, end: int) -> bytes:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
"Range": f"bytes={start}-{end - 1}"
}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as response:
return response.read()


def _create_empty_lindi_file(fname: str, *, tar: bool = False):
empty_rfs = {
"refs": {
".zgroup": {
"zarr_format": 2
}
}
}
if tar:
LindiTarFile.create(fname)
tf = LindiTarFile(fname)
tf.write_rfs(empty_rfs)
else:
with open(fname, "w") as f:
json.dump(empty_rfs, f)
Loading

0 comments on commit 34a21f9

Please sign in to comment.