diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 9858248..4bdc420 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -1,6 +1,6 @@ import json import base64 -from typing import Union, List, IO, Any, Dict, Literal +from typing import Union, List, IO, Any, Dict from dataclasses import dataclass import numpy as np import zarr @@ -12,7 +12,8 @@ _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, _join, - _get_chunk_names_for_dataset + _get_chunk_names_for_dataset, + _write_rfs_to_file, ) from ..conversion.attr_conversion import h5_to_zarr_attr from ..conversion.reformat_json import reformat_json @@ -460,17 +461,17 @@ def listdir(self, path: str = "") -> List[str]: else: return [] - def to_file(self, file_name: str, *, file_type: Literal["zarr.json"] = "zarr.json"): + def write_reference_file_system(self, output_file_name: str): """Write a reference file system corresponding to this store to a file. This can then be loaded using LindiH5pyFile.from_reference_file_system(file_name) """ - if file_type != "zarr.json": - raise Exception(f"Unsupported file type: {file_type}") - ret = self.to_reference_file_system() - with open(file_name, "w") as f: - json.dump(ret, f, indent=2) + if not output_file_name.endswith(".lindi.json"): + raise Exception("The output file name must end with .lindi.json") + + rfs = self.to_reference_file_system() + _write_rfs_to_file(rfs=rfs, output_file_name=output_file_name) def to_reference_file_system(self) -> dict: """Create a reference file system corresponding to this store. diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 0866292..6deb5c0 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -1,4 +1,5 @@ from typing import IO, List +import json import numpy as np import h5py @@ -83,3 +84,10 @@ def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]: for name0 in names0: names.append(f"{i}.{name0}") return names + + +def _write_rfs_to_file(*, rfs: dict, output_file_name: str): + """Write a reference file system to a file. + """ + with open(output_file_name, "w") as f: + json.dump(rfs, f, indent=2, sort_keys=True) diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 9a93d28..e8f9a50 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -12,6 +12,9 @@ from .LindiH5pyReference import LindiH5pyReference from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore +from ..LindiStagingStore.StagingArea import StagingArea +from ..LindiStagingStore.LindiStagingStore import LindiStagingStore + class LindiH5pyFile(h5py.File): def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r"): @@ -29,7 +32,7 @@ def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: U self._id = f'{id(self._file_object)}/' @staticmethod - def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r"): + def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None): """ Create a LindiH5pyFile from a reference file system. @@ -47,6 +50,10 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = to_reference_file_system() to export the updated reference file system to the same file or a new file. """ + if staging_area is not None: + if mode not in ['r+']: + raise Exception("Staging area cannot be used in read-only mode") + if isinstance(rfs, str): if rfs.startswith("http") or rfs.startswith("https"): with tempfile.TemporaryDirectory() as tmpdir: @@ -55,15 +62,17 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = with open(filename, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area) else: with open(rfs, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area) elif isinstance(rfs, dict): # This store does not need to be closed store = LindiReferenceFileSystemStore(rfs) + if staging_area: + store = LindiStagingStore(base_store=store, staging_area=staging_area) return LindiH5pyFile.from_zarr_store(store, mode=mode) else: raise Exception(f"Unhandled type for rfs: {type(rfs)}") @@ -131,9 +140,12 @@ def to_reference_file_system(self): """ if self._zarr_store is None: raise Exception("Cannot convert to reference file system without zarr store") - if not isinstance(self._zarr_store, LindiReferenceFileSystemStore): + zarr_store = self._zarr_store + if isinstance(zarr_store, LindiStagingStore): + zarr_store = zarr_store._base_store + if not isinstance(zarr_store, LindiReferenceFileSystemStore): raise Exception(f"Unexpected type for zarr store: {type(self._zarr_store)}") - rfs = self._zarr_store.rfs + rfs = zarr_store.rfs rfs_copy = json.loads(json.dumps(rfs)) LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs_copy) return rfs_copy @@ -341,6 +353,15 @@ def require_dataset(self, name, shape, dtype, exact=False, **kwds): raise Exception("Cannot require dataset in read-only mode") return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds) + ############################## + # staging store + @property + def staging_store(self): + store = self._zarr_store + if not isinstance(store, LindiStagingStore): + return None + return store + def _download_file(url: str, filename: str) -> None: headers = { diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py new file mode 100644 index 0000000..13f2e48 --- /dev/null +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -0,0 +1,267 @@ +from typing import Callable +import json +import tempfile +import os +from zarr.storage import Store as ZarrStore +from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore +from .StagingArea import StagingArea, _random_str +from ..LindiH5ZarrStore._util import _write_rfs_to_file + + +# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL +# (or local path) +UploadFileFunc = Callable[[str], str] + + +class LindiStagingStore(ZarrStore): + """ + A Zarr store that allows supplementing a base LindiReferenceFileSystemStore + where the large data blobs are stored in a staging area. After writing new + data to the store, the data blobs can be consolidated into larger files and + then uploaded to a custom storage system, for example DANDI or a cloud + bucket. + """ + def __init__(self, *, base_store: LindiReferenceFileSystemStore, staging_area: StagingArea): + """ + Create a LindiStagingStore. + + Parameters + ---------- + base_store : LindiReferenceFileSystemStore + The base store that this store supplements. + staging_area : StagingArea + The staging area where large data blobs are stored. + """ + self._base_store = base_store + self._staging_area = staging_area + + def __getitem__(self, key: str): + return self._base_store.__getitem__(key) + + def __setitem__(self, key: str, value: bytes): + key_parts = key.split("/") + key_base_name = key_parts[-1] + if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json + inline = True + else: + # presumably it is a chunk of an array + if not isinstance(value, bytes): + raise ValueError("Value must be bytes") + size = len(value) + inline = size < 1000 # this should be a configurable threshold + if inline: + # If inline, save in memory + return self._base_store.__setitem__(key, value) + else: + # If not inline, save it as a file in the staging directory + key_without_initial_slash = key if not key.startswith("/") else key[1:] + stored_file_path = self._staging_area.store_file(key_without_initial_slash, value) + + self._set_ref_reference(key_without_initial_slash, stored_file_path, 0, len(value)) + + def __delitem__(self, key: str): + # We don't delete the file from the staging directory, because that + # would be dangerous if the file was part of a consolidated file. + return self._base_store.__delitem__(key) + + def __iter__(self): + return self._base_store.__iter__() + + def __len__(self): + return self._base_store.__len__() + + # These methods are overridden from BaseStore + def is_readable(self): + return True + + def is_writeable(self): + return True + + def is_listable(self): + return True + + def is_erasable(self): + return False + + def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): + rfs = self._base_store.rfs + if 'refs' not in rfs: + # this shouldn't happen, but we'll be defensive + rfs['refs'] = {} + rfs['refs'][key] = [ + filename, + offset, + size + ] + + def upload( + self, + *, + on_upload_blob: UploadFileFunc, + on_upload_main: UploadFileFunc, + consolidate_chunks: bool = True + ): + """ + Consolidate the chunks in the staging area, upload them to a storage + system, updating the references in the base store, and then upload the + updated reference file system .json file. + + Parameters + ---------- + on_upload_blob : StoreFileFunc + A function that takes a string path to a blob file, uploads or copies it + somewhere, and returns a string URL (or local path). + on_upload_main : StoreFileFunc + A function that takes a string path to the main .json file, stores + it somewhere, and returns a string URL (or local path). + consolidate_chunks : bool + If True (the default), consolidate the chunks in the staging area + before uploading. + + Returns + ------- + str + The URL (or local path) of the uploaded reference file system .json + file. + """ + if consolidate_chunks: + self.consolidate_chunks() + rfs = self._base_store.rfs + rfs = json.loads(json.dumps(rfs)) # deep copy + LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs) + blob_mapping = _upload_directory_of_blobs(self._staging_area.directory, on_upload_blob=on_upload_blob) + for k, v in rfs['refs'].items(): + if isinstance(v, list) and len(v) == 3: + url1 = v[0] + if url1.startswith(self._staging_area.directory + '/'): + url2 = blob_mapping.get(url1, None) + if url2 is None: + raise ValueError(f"Could not find url in blob mapping: {url1}") + rfs['refs'][k][0] = url2 + with tempfile.TemporaryDirectory() as tmpdir: + rfs_fname = f"{tmpdir}/rfs.lindi.json" + _write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname) + return on_upload_main(rfs_fname) + + def consolidate_chunks(self): + """ + Consolidate the chunks in the staging area. + + This method is called by `upload` if `consolidate_chunks` is True. + """ + rfs = self._base_store.rfs + refs_keys_by_reference_parent_path = {} + for k, v in rfs['refs'].items(): + if isinstance(v, list) and len(v) == 3: + url = v[0] + if not url.startswith(self._staging_area.directory + '/'): + continue + parent_path = os.path.dirname(url) + if parent_path not in refs_keys_by_reference_parent_path: + refs_keys_by_reference_parent_path[parent_path] = [] + refs_keys_by_reference_parent_path[parent_path].append(k) + for root, dirs, files1 in os.walk(self._staging_area._directory): + files = [ + f for f in files1 + if not f.startswith('.') and not f.endswith('.json') and not f.startswith('consolidated.') + ] + if len(files) <= 1: + continue + refs_keys_for_this_dir = refs_keys_by_reference_parent_path.get(root, []) + if len(refs_keys_for_this_dir) <= 1: + continue + + # sort so that the files are in order 0.0.0, 0.0.1, 0.0.2, ... + files = _sort_by_chunk_key(files) + + print(f'Consolidating {len(files)} files in {root}') + + offset = 0 + offset_maps = {} + consolidated_id = _random_str(8) + consolidated_index = 0 + max_size_of_consolidated_file = 1024 * 1024 * 1024 # 1 GB, a good size for cloud bucket files + consolidated_fname = f"{root}/consolidated.{consolidated_id}.{consolidated_index}" + consolidated_f = open(consolidated_fname, "wb") + try: + for fname in files: + full_fname = f"{root}/{fname}" + with open(full_fname, "rb") as f2: + consolidated_f.write(f2.read()) + offset_maps[full_fname] = (consolidated_fname, offset) + offset += os.path.getsize(full_fname) + if offset > max_size_of_consolidated_file: + consolidated_f.close() + consolidated_index += 1 + consolidated_fname = f"{root}/consolidated.{consolidated_id}.{consolidated_index}" + consolidated_f = open(consolidated_fname, "wb") + offset = 0 + finally: + consolidated_f.close() + for key in refs_keys_for_this_dir: + filename, old_offset, old_size = rfs['refs'][key] + if filename not in offset_maps: + continue + consolidated_fname, new_offset = offset_maps[filename] + rfs['refs'][key] = [consolidated_fname, new_offset + old_offset, old_size] + # remove the old files + for fname in files: + os.remove(f"{root}/{fname}") + + +def _sort_by_chunk_key(files: list) -> list: + # first verify that all the files have the same number of parts + num_parts = None + for fname in files: + parts = fname.split('.') + if num_parts is None: + num_parts = len(parts) + elif len(parts) != num_parts: + raise ValueError(f"Files have different numbers of parts: {files}") + # Verify that all the parts are integers + for fname in files: + parts = fname.split('.') + for p in parts: + try: + int(p) + except ValueError: + raise ValueError(f"File part is not an integer: {fname}") + + def _chunk_key(fname: str) -> tuple: + parts = fname.split('.') + return tuple(int(p) for p in parts) + return sorted(files, key=_chunk_key) + + +def _upload_directory_of_blobs( + staging_dir: str, + on_upload_blob: UploadFileFunc +) -> dict: + """ + Upload all the files in a directory to a storage system and return a mapping + from the original file paths to the URLs of the uploaded files. + """ + all_files = [] + for root, dirs, files in os.walk(staging_dir): + for fname in files: + full_fname = f"{root}/{fname}" + all_files.append(full_fname) + blob_mapping = {} + for i, full_fname in enumerate(all_files): + relative_fname = full_fname[len(staging_dir):] + size_bytes = os.path.getsize(full_fname) + print(f'Uploading blob {i + 1} of {len(all_files)} {relative_fname} ({_format_size_bytes(size_bytes)})') + blob_url = on_upload_blob(full_fname) + blob_mapping[full_fname] = blob_url + return blob_mapping + + +def _format_size_bytes(size_bytes: int) -> str: + if size_bytes < 1024: + return f"{size_bytes} bytes" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / 1024 / 1024:.1f} MB" + else: + return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB" diff --git a/lindi/LindiStagingStore/StagingArea.py b/lindi/LindiStagingStore/StagingArea.py new file mode 100644 index 0000000..b9f41d3 --- /dev/null +++ b/lindi/LindiStagingStore/StagingArea.py @@ -0,0 +1,98 @@ +import os +import random +import string +import datetime +import shutil + + +class StagingArea: + """ + A staging area where files can be stored temporarily before being + consolidated and uploaded to a storage system. + + This class is a context manager, so it can be used in a `with` statement to + ensure that the staging area is cleaned up when it is no longer needed. + """ + def __init__(self, *, _directory: str) -> None: + """ + Do not call this constructor directly. Instead, use the `create` method + to create a new staging area. + """ + self._directory = os.path.abspath(_directory) + + @staticmethod + def create(base_dir: str) -> 'StagingArea': + """ + Create a new staging area. + + Parameters + ---------- + base_dir : str + The base directory where the staging area will be created. The + staging directory will be a subdirectory of this directory. + """ + dir = os.path.join(base_dir, _create_random_id()) + return StagingArea(_directory=dir) + + def cleanup(self) -> None: + """ + Clean up the staging area, deleting all files in it. This method is + called automatically when the staging area is used as a context manager + in a `with` statement. + """ + if os.path.exists(self._directory): + shutil.rmtree(self._directory) + + def __enter__(self) -> 'StagingArea': + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + self.cleanup() + + @property + def directory(self) -> str: + """ + The directory where the files are stored. + """ + return self._directory + + def store_file(self, relpath: str, value: bytes) -> str: + """ + Store a file in the staging area. + + Parameters + ---------- + relpath : str + The relative path to the file, relative to the staging area root. + value : bytes + The contents of the file. + """ + path = os.path.join(self._directory, relpath) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'wb') as f: + f.write(value) + return path + + def get_full_path(self, relpath: str) -> str: + """ + Get the full path to a file in the staging area. + + Parameters + ---------- + relpath : str + The relative path to the file, relative to the staging area root. + """ + return os.path.join(self._directory, relpath) + + +def _create_random_id(): + # This is going to be a timestamp suitable for alphabetical chronological order plus a random string + return f"{_timestamp_str()}-{_random_str(8)}" + + +def _timestamp_str(): + return datetime.datetime.now().strftime("%Y%m%d%H%M%S") + + +def _random_str(n): + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=n)) diff --git a/lindi/LindiStagingStore/__init__.py b/lindi/LindiStagingStore/__init__.py new file mode 100644 index 0000000..7ab3d49 --- /dev/null +++ b/lindi/LindiStagingStore/__init__.py @@ -0,0 +1 @@ +from .LindiStagingStore import LindiStagingStore, StagingArea # noqa: F401 diff --git a/lindi/__init__.py b/lindi/__init__.py index 3c9754a..794e1cc 100644 --- a/lindi/__init__.py +++ b/lindi/__init__.py @@ -1,2 +1,3 @@ from .LindiH5ZarrStore import LindiH5ZarrStore, LindiH5ZarrStoreOpts # noqa: F401 from .LindiH5pyFile import LindiH5pyFile, LindiH5pyGroup, LindiH5pyDataset, LindiH5pyHardLink, LindiH5pySoftLink # noqa: F401 +from .LindiStagingStore import LindiStagingStore, StagingArea # noqa: F401 diff --git a/tests/test_core.py b/tests/test_core.py index 20a0d46..f57f9d7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -304,8 +304,8 @@ def test_reference_file_system_to_file(): with h5py.File(filename, "w") as f: f.create_dataset("X", data=[1, 2, 3]) with LindiH5ZarrStore.from_file(filename, url=filename) as store: - rfs_fname = f'{tmpdir}/test.zarr.json' - store.to_file(rfs_fname) + rfs_fname = f'{tmpdir}/test.lindi.json' + store.write_reference_file_system(rfs_fname) client = lindi.LindiH5pyFile.from_reference_file_system(rfs_fname) X = client["X"] assert isinstance(X, lindi.LindiH5pyDataset) @@ -423,7 +423,7 @@ def test_lindi_h5_zarr_store(): with pytest.raises(Exception, match=store_is_closed_msg): store.to_reference_file_system() with pytest.raises(Exception, match=store_is_closed_msg): - store.to_file("test.json") + store.write_reference_file_system("test.lindi.json") with pytest.raises(Exception, match=store_is_closed_msg): store._get_chunk_file_bytes_data("dataset1", "0") @@ -443,17 +443,13 @@ def test_lindi_h5_zarr_store(): store["nonexistent/0"] # Key error - store = LindiH5ZarrStore.from_file(filename) + store = LindiH5ZarrStore.from_file(filename, url='.') with pytest.raises(KeyError): store[''] assert '' not in store with pytest.raises(KeyError): store["nonexistent/.zattrs"] - # Unsupported file type - with pytest.raises(Exception, match="Unsupported file type: zarr"): - store.to_file("test.json", file_type="zarr") # type: ignore - # URL is not set store = LindiH5ZarrStore.from_file(filename, url=None) with pytest.raises(Exception, match="You must specify a url to create a reference file system"): diff --git a/tests/test_staging_area.py b/tests/test_staging_area.py new file mode 100644 index 0000000..e79f58b --- /dev/null +++ b/tests/test_staging_area.py @@ -0,0 +1,68 @@ +import tempfile +import os +import numpy as np +import lindi +import shutil + + +def test_staging_area(): + with tempfile.TemporaryDirectory() as tmpdir: + staging_area = lindi.StagingArea.create(tmpdir + '/staging_area') + empty_rfs = {'refs': {'.zgroup': {'zarr_format': 2}}} + client = lindi.LindiH5pyFile.from_reference_file_system(empty_rfs, mode='r+', staging_area=staging_area) + X = np.random.randn(1000, 1000).astype(np.float32) + client.create_dataset('large_array', data=X, chunks=(400, 400)) + total_size = _get_total_size_of_directory(tmpdir) + assert total_size >= X.nbytes * 0.5, f'{total_size} < {X.nbytes} * 0.5' # take into consideration compression + rfs = client.to_reference_file_system() + client2 = lindi.LindiH5pyFile.from_reference_file_system(rfs, mode='r') + assert isinstance(client2, lindi.LindiH5pyFile) + X1 = client['large_array'] + assert isinstance(X1, lindi.LindiH5pyDataset) + X2 = client2['large_array'] + assert isinstance(X2, lindi.LindiH5pyDataset) + assert np.allclose(X1[:], X2[:]) + + upload_dir = f'{tmpdir}/upload_dir' + os.makedirs(upload_dir, exist_ok=True) + output_fname = f'{tmpdir}/output.lindi.json' + + def on_upload_blob(fname: str): + random_fname = f'{upload_dir}/{_random_string(10)}' + shutil.copy(fname, random_fname) + return random_fname + + def on_upload_main(fname: str): + shutil.copy(fname, output_fname) + return output_fname + + assert client.staging_store + client.staging_store.upload( + on_upload_blob=on_upload_blob, + on_upload_main=on_upload_main, + consolidate_chunks=True + ) + + client3 = lindi.LindiH5pyFile.from_reference_file_system(output_fname, mode='r') + X3 = client3['large_array'] + assert isinstance(X3, lindi.LindiH5pyDataset) + assert np.allclose(X1[:], X3[:]) + + +def _get_total_size_of_directory(directory): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(directory): + for f in filenames: + fp = os.path.join(dirpath, f) + total_size += os.path.getsize(fp) + return total_size + + +def _random_string(n): + import random + import string + return ''.join(random.choices(string.ascii_uppercase + string.digits, k=n)) + + +if __name__ == '__main__': + test_staging_area() diff --git a/tests/test_store.py b/tests/test_store.py index c2ed53b..07d59cf 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -13,7 +13,7 @@ def test_store(): group1.create_group("group2") group1.create_dataset("dataset2", data=[4, 5, 6]) with lindi.LindiH5ZarrStore.from_file(filename, url=filename) as store: - store.to_file(f"{tmpdir}/test.zarr.json") # for coverage + store.write_reference_file_system(f"{tmpdir}/test.lindi.json") # for coverage a = store.listdir('') assert _lists_are_equal_as_sets(a, ['dataset1', 'group1']) b = store.listdir('group1')