Skip to content

Commit

Permalink
Merge pull request #42 from NeurodataWithoutBorders/staging-store
Browse files Browse the repository at this point in the history
Implement staging store for preparing .nwb.json for upload to cloud
  • Loading branch information
magland authored Apr 19, 2024
2 parents bc71a9d + 5b4b6f4 commit 40c94a1
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 22 deletions.
17 changes: 9 additions & 8 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import base64
from typing import Union, List, IO, Any, Dict, Literal
from typing import Union, List, IO, Any, Dict
from dataclasses import dataclass
import numpy as np
import zarr
Expand All @@ -12,7 +12,8 @@
_get_chunk_byte_range,
_get_byte_range_for_contiguous_dataset,
_join,
_get_chunk_names_for_dataset
_get_chunk_names_for_dataset,
_write_rfs_to_file,
)
from ..conversion.attr_conversion import h5_to_zarr_attr
from ..conversion.reformat_json import reformat_json
Expand Down Expand Up @@ -460,17 +461,17 @@ def listdir(self, path: str = "") -> List[str]:
else:
return []

def to_file(self, file_name: str, *, file_type: Literal["zarr.json"] = "zarr.json"):
def write_reference_file_system(self, output_file_name: str):
"""Write a reference file system corresponding to this store to a file.
This can then be loaded using LindiH5pyFile.from_reference_file_system(file_name)
"""
if file_type != "zarr.json":
raise Exception(f"Unsupported file type: {file_type}")

ret = self.to_reference_file_system()
with open(file_name, "w") as f:
json.dump(ret, f, indent=2)
if not output_file_name.endswith(".lindi.json"):
raise Exception("The output file name must end with .lindi.json")

rfs = self.to_reference_file_system()
_write_rfs_to_file(rfs=rfs, output_file_name=output_file_name)

def to_reference_file_system(self) -> dict:
"""Create a reference file system corresponding to this store.
Expand Down
8 changes: 8 additions & 0 deletions lindi/LindiH5ZarrStore/_util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import IO, List
import json
import numpy as np
import h5py

Expand Down Expand Up @@ -83,3 +84,10 @@ def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]:
for name0 in names0:
names.append(f"{i}.{name0}")
return names


def _write_rfs_to_file(*, rfs: dict, output_file_name: str):
"""Write a reference file system to a file.
"""
with open(output_file_name, "w") as f:
json.dump(rfs, f, indent=2, sort_keys=True)
31 changes: 26 additions & 5 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from .LindiH5pyReference import LindiH5pyReference
from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore

from ..LindiStagingStore.StagingArea import StagingArea
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore


class LindiH5pyFile(h5py.File):
def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r"):
Expand All @@ -29,7 +32,7 @@ def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: U
self._id = f'{id(self._file_object)}/'

@staticmethod
def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r"):
def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None):
"""
Create a LindiH5pyFile from a reference file system.
Expand All @@ -47,6 +50,10 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] =
to_reference_file_system() to export the updated reference file
system to the same file or a new file.
"""
if staging_area is not None:
if mode not in ['r+']:
raise Exception("Staging area cannot be used in read-only mode")

if isinstance(rfs, str):
if rfs.startswith("http") or rfs.startswith("https"):
with tempfile.TemporaryDirectory() as tmpdir:
Expand All @@ -55,15 +62,17 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] =
with open(filename, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode)
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area)
else:
with open(rfs, "r") as f:
data = json.load(f)
assert isinstance(data, dict) # prevent infinite recursion
return LindiH5pyFile.from_reference_file_system(data, mode=mode)
return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area)
elif isinstance(rfs, dict):
# This store does not need to be closed
store = LindiReferenceFileSystemStore(rfs)
if staging_area:
store = LindiStagingStore(base_store=store, staging_area=staging_area)
return LindiH5pyFile.from_zarr_store(store, mode=mode)
else:
raise Exception(f"Unhandled type for rfs: {type(rfs)}")
Expand Down Expand Up @@ -131,9 +140,12 @@ def to_reference_file_system(self):
"""
if self._zarr_store is None:
raise Exception("Cannot convert to reference file system without zarr store")
if not isinstance(self._zarr_store, LindiReferenceFileSystemStore):
zarr_store = self._zarr_store
if isinstance(zarr_store, LindiStagingStore):
zarr_store = zarr_store._base_store
if not isinstance(zarr_store, LindiReferenceFileSystemStore):
raise Exception(f"Unexpected type for zarr store: {type(self._zarr_store)}")
rfs = self._zarr_store.rfs
rfs = zarr_store.rfs
rfs_copy = json.loads(json.dumps(rfs))
LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs_copy)
return rfs_copy
Expand Down Expand Up @@ -341,6 +353,15 @@ def require_dataset(self, name, shape, dtype, exact=False, **kwds):
raise Exception("Cannot require dataset in read-only mode")
return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds)

##############################
# staging store
@property
def staging_store(self):
store = self._zarr_store
if not isinstance(store, LindiStagingStore):
return None
return store


def _download_file(url: str, filename: str) -> None:
headers = {
Expand Down
Loading

0 comments on commit 40c94a1

Please sign in to comment.