From a55f6af816e85ae719ba1462acb50bf0b8787a59 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Thu, 4 Apr 2024 19:36:28 -0400 Subject: [PATCH 1/9] implement LindiStagingStore --- lindi/LindiH5pyFile/LindiH5pyFile.py | 15 +- lindi/LindiStagingStore/LindiStagingStore.py | 232 +++++++++++++++++++ lindi/LindiStagingStore/StagingArea.py | 98 ++++++++ lindi/LindiStagingStore/__init__.py | 0 4 files changed, 342 insertions(+), 3 deletions(-) create mode 100644 lindi/LindiStagingStore/LindiStagingStore.py create mode 100644 lindi/LindiStagingStore/StagingArea.py create mode 100644 lindi/LindiStagingStore/__init__.py diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index d25f2fb..e2ecbe7 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -12,6 +12,9 @@ from .LindiH5pyReference import LindiH5pyReference from .LindiReferenceFileSystemStore import LindiReferenceFileSystemStore +from ..LindiStagingStore.StagingArea import StagingArea +from ..LindiStagingStore.LindiStagingStore import LindiStagingStore + class LindiH5pyFile(h5py.File): def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: Union[ZarrStore, None] = None, _mode: Literal["r", "r+"] = "r"): @@ -26,7 +29,7 @@ def __init__(self, _file_object: Union[h5py.File, zarr.Group], *, _zarr_store: U self._the_group = LindiH5pyGroup(_file_object, self) @staticmethod - def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r"): + def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = "r", staging_area: Union[StagingArea, None] = None): """ Create a LindiH5pyFile from a reference file system. @@ -44,6 +47,10 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = to_reference_file_system() to export the updated reference file system to the same file or a new file. """ + if staging_area is not None: + if mode not in ['r+']: + raise Exception("Staging area cannot be used in read-only mode") + if isinstance(rfs, str): if rfs.startswith("http") or rfs.startswith("https"): with tempfile.TemporaryDirectory() as tmpdir: @@ -52,15 +59,17 @@ def from_reference_file_system(rfs: Union[dict, str], mode: Literal["r", "r+"] = with open(filename, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area) else: with open(rfs, "r") as f: data = json.load(f) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode) + return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area) elif isinstance(rfs, dict): # This store does not need to be closed store = LindiReferenceFileSystemStore(rfs) + if staging_area: + store = LindiStagingStore(base_store=store, staging_area=staging_area) return LindiH5pyFile.from_zarr_store(store, mode=mode) else: raise Exception(f"Unhandled type for rfs: {type(rfs)}") diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py new file mode 100644 index 0000000..35edfdc --- /dev/null +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -0,0 +1,232 @@ +from typing import Callable +import json +import tempfile +import os +from zarr.storage import Store as ZarrStore +from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore +from .StagingArea import StagingArea, _random_str + + +# Accepts a string path to a file, stores it somewhere, and returns a string URL +# (or local path) +StoreFileFunc = Callable[[str], str] + + +class LindiStagingStore(ZarrStore): + """ + A Zarr store that allows supplementing a base LindiReferenceFileSystemStore + where the large data blobs are stored in a staging area. After writing new + data to the store, the data blobs can be consolidated into larger files and + then uploaded to a custom storage system, for example DANDI or a cloud + bucket. + """ + def __init__(self, *, base_store: LindiReferenceFileSystemStore, staging_area: StagingArea): + """ + Create a LindiStagingStore. + + Parameters + ---------- + base_store : LindiReferenceFileSystemStore + The base store that this store supplements. + staging_area : StagingArea + The staging area where large data blobs are stored. + """ + self._base_store = base_store + self._staging_area = staging_area + + def __getitem__(self, key: str): + return self._base_store.__getitem__(key) + + def __setitem__(self, key: str, value: bytes): + key_parts = key.split("/") + key_base_name = key_parts[-1] + if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json + inline = True + else: + # presumably it is a chunk of an array + if not isinstance(value, bytes): + raise ValueError("Value must be bytes") + size = len(value) + inline = size < 1000 # this should be a configurable threshold + if inline: + # If inline, save in memory + return self._base_store.__setitem__(key, value) + else: + # If not inline, save it as a file in the staging directory + key_without_initial_slash = key if not key.startswith("/") else key[1:] + stored_file_path = self._staging_area.store_file(key_without_initial_slash, value) + + self._set_ref_reference(key_without_initial_slash, stored_file_path, 0, len(value)) + + def __delitem__(self, key: str): + # We don't delete the file from the staging directory, because that + # would be dangerous if the file was part of a consolidated file. + return self._base_store.__delitem__(key) + + def __iter__(self): + return self._base_store.__iter__() + + def __len__(self): + return self._base_store.__len__() + + # These methods are overridden from BaseStore + def is_readable(self): + return True + + def is_writeable(self): + return True + + def is_listable(self): + return True + + def is_erasable(self): + return False + + def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): + rfs = self._base_store.rfs + if 'refs' not in rfs: + # this shouldn't happen, but we'll be defensive + rfs['refs'] = {} + rfs['refs'][key] = [ + filename, + offset, + size + ] + + def upload( + self, + *, + on_store_blob: StoreFileFunc, + on_store_rfs: StoreFileFunc, + consolidate_chunks: bool = True + ): + """ + Consolidate the chunks in the staging area, upload them to a storage + system, updating the references in the base store, and then upload the + updated reference file system. + + Parameters + ---------- + on_store_blob : StoreFileFunc + A function that takes a string path to a file, stores it somewhere, + and returns a string URL (or local path). + on_store_rfs : StoreFileFunc + A function that takes a string path to a file, stores it somewhere, + and returns a string URL (or local path). + consolidate_chunks : bool + If True (the default), consolidate the chunks in the staging area + before uploading. + """ + if consolidate_chunks: + self.consolidate_chunks() + rfs = self._base_store.rfs + blob_mapping = _upload_directory_of_blobs(self._staging_area.directory, on_store_blob=on_store_blob) + for k, v in rfs['refs'].items(): + if isinstance(v, list) and len(v) == 3: + url1 = v[0] + if url1.startswith(self._staging_area.directory + '/'): + url2 = blob_mapping.get(url1, None) + if url2 is None: + raise ValueError(f"Could not find url in blob mapping: {url1}") + rfs['refs'][k][0] = url2 + with tempfile.TemporaryDirectory() as tmpdir: + rfs_fname = f"{tmpdir}/rfs.json" + with open(rfs_fname, 'w') as f: + json.dump(rfs, f, indent=2, sort_keys=True) + return on_store_rfs(rfs_fname) + + def consolidate_chunks(self): + """ + Consolidate the chunks in the staging area. + + This method is called by `upload` if `consolidate_chunks` is True. + """ + rfs = self._base_store.rfs + refs_keys_by_reference_parent_path = {} + for k, v in rfs['refs'].items(): + if isinstance(v, list) and len(v) == 3: + url = v[0] + if not url.startswith(self._staging_area.directory + '/'): + continue + parent_path = os.path.dirname(url) + if parent_path not in refs_keys_by_reference_parent_path: + refs_keys_by_reference_parent_path[parent_path] = [] + refs_keys_by_reference_parent_path[parent_path].append(k) + for root, dirs, files1 in os.walk(self._staging_area._directory): + files = [ + f for f in files1 + if not f.startswith('.') and not f.endswith('.json') and not f.startswith('consolidated.') + ] + if len(files) <= 1: + continue + refs_keys_for_this_dir = refs_keys_by_reference_parent_path.get(root, []) + if len(refs_keys_for_this_dir) <= 1: + continue + + print(f'Consolidating {len(files)} files in {root}') + + offset = 0 + offset_maps = {} + consolidated_id = _random_str(8) + consolidated_index = 0 + max_size_of_consolidated_file = 1024 * 1024 * 1024 # 1 GB, a good size for cloud bucket files + consolidated_fname = f"{root}/consolidated.{consolidated_id}.{consolidated_index}" + consolidated_f = open(consolidated_fname, "wb") + try: + for fname in files: + full_fname = f"{root}/{fname}" + with open(full_fname, "rb") as f2: + consolidated_f.write(f2.read()) + offset_maps[full_fname] = (consolidated_fname, offset) + offset += os.path.getsize(full_fname) + if offset > max_size_of_consolidated_file: + consolidated_f.close() + consolidated_index += 1 + consolidated_fname = f"{root}/consolidated.{consolidated_id}.{consolidated_index}" + consolidated_f = open(consolidated_fname, "wb") + offset = 0 + finally: + consolidated_f.close() + for key in refs_keys_for_this_dir: + filename, old_offset, old_size = rfs['refs'][key] + if filename not in offset_maps: + continue + consolidated_fname, new_offset = offset_maps[filename] + rfs['refs'][key] = [consolidated_fname, new_offset + old_offset, old_size] + # remove the old files + for fname in files: + os.remove(f"{root}/{fname}") + + +def _upload_directory_of_blobs( + staging_dir: str, + on_store_blob: StoreFileFunc +) -> dict: + """ + Upload all the files in a directory to a storage system and return a mapping + from the original file paths to the URLs of the uploaded files. + """ + all_files = [] + for root, dirs, files in os.walk(staging_dir): + for fname in files: + full_fname = f"{root}/{fname}" + all_files.append(full_fname) + blob_mapping = {} + for i, full_fname in enumerate(all_files): + relative_fname = full_fname[len(staging_dir):] + size_bytes = os.path.getsize(full_fname) + print(f'Uploading blob {i + 1} of {len(all_files)} {relative_fname} ({_format_size_bytes(size_bytes)})') + blob_url = on_store_blob(full_fname) + blob_mapping[full_fname] = blob_url + return blob_mapping + + +def _format_size_bytes(size_bytes: int) -> str: + if size_bytes < 1024: + return f"{size_bytes} bytes" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / 1024 / 1024:.1f} MB" + else: + return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB" diff --git a/lindi/LindiStagingStore/StagingArea.py b/lindi/LindiStagingStore/StagingArea.py new file mode 100644 index 0000000..b9f41d3 --- /dev/null +++ b/lindi/LindiStagingStore/StagingArea.py @@ -0,0 +1,98 @@ +import os +import random +import string +import datetime +import shutil + + +class StagingArea: + """ + A staging area where files can be stored temporarily before being + consolidated and uploaded to a storage system. + + This class is a context manager, so it can be used in a `with` statement to + ensure that the staging area is cleaned up when it is no longer needed. + """ + def __init__(self, *, _directory: str) -> None: + """ + Do not call this constructor directly. Instead, use the `create` method + to create a new staging area. + """ + self._directory = os.path.abspath(_directory) + + @staticmethod + def create(base_dir: str) -> 'StagingArea': + """ + Create a new staging area. + + Parameters + ---------- + base_dir : str + The base directory where the staging area will be created. The + staging directory will be a subdirectory of this directory. + """ + dir = os.path.join(base_dir, _create_random_id()) + return StagingArea(_directory=dir) + + def cleanup(self) -> None: + """ + Clean up the staging area, deleting all files in it. This method is + called automatically when the staging area is used as a context manager + in a `with` statement. + """ + if os.path.exists(self._directory): + shutil.rmtree(self._directory) + + def __enter__(self) -> 'StagingArea': + return self + + def __exit__(self, exc_type, exc_value, traceback) -> None: + self.cleanup() + + @property + def directory(self) -> str: + """ + The directory where the files are stored. + """ + return self._directory + + def store_file(self, relpath: str, value: bytes) -> str: + """ + Store a file in the staging area. + + Parameters + ---------- + relpath : str + The relative path to the file, relative to the staging area root. + value : bytes + The contents of the file. + """ + path = os.path.join(self._directory, relpath) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'wb') as f: + f.write(value) + return path + + def get_full_path(self, relpath: str) -> str: + """ + Get the full path to a file in the staging area. + + Parameters + ---------- + relpath : str + The relative path to the file, relative to the staging area root. + """ + return os.path.join(self._directory, relpath) + + +def _create_random_id(): + # This is going to be a timestamp suitable for alphabetical chronological order plus a random string + return f"{_timestamp_str()}-{_random_str(8)}" + + +def _timestamp_str(): + return datetime.datetime.now().strftime("%Y%m%d%H%M%S") + + +def _random_str(n): + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=n)) diff --git a/lindi/LindiStagingStore/__init__.py b/lindi/LindiStagingStore/__init__.py new file mode 100644 index 0000000..e69de29 From 4d51de2474a0bb52b86de00be1ce16acc3b91b3d Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 5 Apr 2024 12:14:12 -0400 Subject: [PATCH 2/9] implement staging store --- lindi/LindiH5pyFile/LindiH5pyFile.py | 9 ++++++++ lindi/LindiStagingStore/LindiStagingStore.py | 22 +++++++++++++------- lindi/LindiStagingStore/__init__.py | 1 + 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index e2ecbe7..fcd1278 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -345,6 +345,15 @@ def require_dataset(self, name, shape, dtype, exact=False, **kwds): raise Exception("Cannot require dataset in read-only mode") return self._the_group.require_dataset(name, shape, dtype, exact=exact, **kwds) + ############################## + # staging store + @property + def staging_store(self): + store = self._zarr_store + if not isinstance(store, LindiStagingStore): + return None + return store + def _download_file(url: str, filename: str) -> None: headers = { diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py index 35edfdc..f9422fa 100644 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -97,25 +97,31 @@ def upload( self, *, on_store_blob: StoreFileFunc, - on_store_rfs: StoreFileFunc, + on_store_main: StoreFileFunc, consolidate_chunks: bool = True ): """ Consolidate the chunks in the staging area, upload them to a storage system, updating the references in the base store, and then upload the - updated reference file system. + updated reference file system .json file. Parameters ---------- on_store_blob : StoreFileFunc - A function that takes a string path to a file, stores it somewhere, - and returns a string URL (or local path). - on_store_rfs : StoreFileFunc - A function that takes a string path to a file, stores it somewhere, - and returns a string URL (or local path). + A function that takes a string path to a blob file, stores it + somewhere, and returns a string URL (or local path). + on_store_main : StoreFileFunc + A function that takes a string path to the main .json file, stores + it somewhere, and returns a string URL (or local path). consolidate_chunks : bool If True (the default), consolidate the chunks in the staging area before uploading. + + Returns + ------- + str + The URL (or local path) of the uploaded reference file system .json + file. """ if consolidate_chunks: self.consolidate_chunks() @@ -133,7 +139,7 @@ def upload( rfs_fname = f"{tmpdir}/rfs.json" with open(rfs_fname, 'w') as f: json.dump(rfs, f, indent=2, sort_keys=True) - return on_store_rfs(rfs_fname) + return on_store_main(rfs_fname) def consolidate_chunks(self): """ diff --git a/lindi/LindiStagingStore/__init__.py b/lindi/LindiStagingStore/__init__.py index e69de29..7ab3d49 100644 --- a/lindi/LindiStagingStore/__init__.py +++ b/lindi/LindiStagingStore/__init__.py @@ -0,0 +1 @@ +from .LindiStagingStore import LindiStagingStore, StagingArea # noqa: F401 From 18ee3d9caa089848ab940c7b7fa8f3fffe62e51b Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 5 Apr 2024 12:23:02 -0400 Subject: [PATCH 3/9] format rfs for upload --- lindi/LindiStagingStore/LindiStagingStore.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py index f9422fa..9773ca5 100644 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -126,6 +126,8 @@ def upload( if consolidate_chunks: self.consolidate_chunks() rfs = self._base_store.rfs + rfs = json.loads(json.dumps(rfs)) # deep copy + LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs) blob_mapping = _upload_directory_of_blobs(self._staging_area.directory, on_store_blob=on_store_blob) for k, v in rfs['refs'].items(): if isinstance(v, list) and len(v) == 3: From 6776fecd5273045fe4ef454c6655ce014e3e5f86 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 5 Apr 2024 13:40:18 -0400 Subject: [PATCH 4/9] update __init__.py for staging store --- lindi/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lindi/__init__.py b/lindi/__init__.py index 3c9754a..794e1cc 100644 --- a/lindi/__init__.py +++ b/lindi/__init__.py @@ -1,2 +1,3 @@ from .LindiH5ZarrStore import LindiH5ZarrStore, LindiH5ZarrStoreOpts # noqa: F401 from .LindiH5pyFile import LindiH5pyFile, LindiH5pyGroup, LindiH5pyDataset, LindiH5pyHardLink, LindiH5pySoftLink # noqa: F401 +from .LindiStagingStore import LindiStagingStore, StagingArea # noqa: F401 From 07658fd6710c91150816bce4ce5dc5868a1e9dd7 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Mon, 15 Apr 2024 11:54:39 -0400 Subject: [PATCH 5/9] sort files prior to consolidating chunks --- lindi/LindiStagingStore/LindiStagingStore.py | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py index 9773ca5..78e01a5 100644 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -171,6 +171,9 @@ def consolidate_chunks(self): if len(refs_keys_for_this_dir) <= 1: continue + # sort so that the files are in order 0.0.0, 0.0.1, 0.0.2, ... + files = _sort_by_chunk_key(files) + print(f'Consolidating {len(files)} files in {root}') offset = 0 @@ -206,6 +209,30 @@ def consolidate_chunks(self): os.remove(f"{root}/{fname}") +def _sort_by_chunk_key(files: list) -> list: + # first verify that all the files have the same number of parts + num_parts = None + for fname in files: + parts = fname.split('.') + if num_parts is None: + num_parts = len(parts) + elif len(parts) != num_parts: + raise ValueError(f"Files have different numbers of parts: {files}") + # Verify that all the parts are integers + for fname in files: + parts = fname.split('.') + for p in parts: + try: + int(p) + except ValueError: + raise ValueError(f"File part is not an integer: {fname}") + + def _chunk_key(fname: str) -> tuple: + parts = fname.split('.') + return tuple(int(p) for p in parts) + return sorted(files, key=_chunk_key) + + def _upload_directory_of_blobs( staging_dir: str, on_store_blob: StoreFileFunc From c650217c14d4b9d420fe65e9448d7016e39eea00 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Thu, 18 Apr 2024 20:33:28 -0500 Subject: [PATCH 6/9] rename on_store -> on_upload --- lindi/LindiStagingStore/LindiStagingStore.py | 22 ++++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py index 78e01a5..a140181 100644 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -7,9 +7,9 @@ from .StagingArea import StagingArea, _random_str -# Accepts a string path to a file, stores it somewhere, and returns a string URL +# Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL # (or local path) -StoreFileFunc = Callable[[str], str] +UploadFileFunc = Callable[[str], str] class LindiStagingStore(ZarrStore): @@ -96,8 +96,8 @@ def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): def upload( self, *, - on_store_blob: StoreFileFunc, - on_store_main: StoreFileFunc, + on_upload_blob: UploadFileFunc, + on_upload_main: UploadFileFunc, consolidate_chunks: bool = True ): """ @@ -107,10 +107,10 @@ def upload( Parameters ---------- - on_store_blob : StoreFileFunc - A function that takes a string path to a blob file, stores it + on_upload_blob : StoreFileFunc + A function that takes a string path to a blob file, uploads or copies it somewhere, and returns a string URL (or local path). - on_store_main : StoreFileFunc + on_upload_main : StoreFileFunc A function that takes a string path to the main .json file, stores it somewhere, and returns a string URL (or local path). consolidate_chunks : bool @@ -128,7 +128,7 @@ def upload( rfs = self._base_store.rfs rfs = json.loads(json.dumps(rfs)) # deep copy LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs) - blob_mapping = _upload_directory_of_blobs(self._staging_area.directory, on_store_blob=on_store_blob) + blob_mapping = _upload_directory_of_blobs(self._staging_area.directory, on_upload_blob=on_upload_blob) for k, v in rfs['refs'].items(): if isinstance(v, list) and len(v) == 3: url1 = v[0] @@ -141,7 +141,7 @@ def upload( rfs_fname = f"{tmpdir}/rfs.json" with open(rfs_fname, 'w') as f: json.dump(rfs, f, indent=2, sort_keys=True) - return on_store_main(rfs_fname) + return on_upload_main(rfs_fname) def consolidate_chunks(self): """ @@ -235,7 +235,7 @@ def _chunk_key(fname: str) -> tuple: def _upload_directory_of_blobs( staging_dir: str, - on_store_blob: StoreFileFunc + on_upload_blob: UploadFileFunc ) -> dict: """ Upload all the files in a directory to a storage system and return a mapping @@ -251,7 +251,7 @@ def _upload_directory_of_blobs( relative_fname = full_fname[len(staging_dir):] size_bytes = os.path.getsize(full_fname) print(f'Uploading blob {i + 1} of {len(all_files)} {relative_fname} ({_format_size_bytes(size_bytes)})') - blob_url = on_store_blob(full_fname) + blob_url = on_upload_blob(full_fname) blob_mapping[full_fname] = blob_url return blob_mapping From f4f9a02e7457ef1be2204bbd75e37f03d300b4b8 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 19 Apr 2024 10:13:57 -0500 Subject: [PATCH 7/9] test_staging_area --- lindi/LindiH5pyFile/LindiH5pyFile.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 9682806..e8f9a50 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -140,9 +140,12 @@ def to_reference_file_system(self): """ if self._zarr_store is None: raise Exception("Cannot convert to reference file system without zarr store") - if not isinstance(self._zarr_store, LindiReferenceFileSystemStore): + zarr_store = self._zarr_store + if isinstance(zarr_store, LindiStagingStore): + zarr_store = zarr_store._base_store + if not isinstance(zarr_store, LindiReferenceFileSystemStore): raise Exception(f"Unexpected type for zarr store: {type(self._zarr_store)}") - rfs = self._zarr_store.rfs + rfs = zarr_store.rfs rfs_copy = json.loads(json.dumps(rfs)) LindiReferenceFileSystemStore.replace_meta_file_contents_with_dicts(rfs_copy) return rfs_copy From 0b35daed749bab8b118310b8b2fa003e6dca7d68 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 19 Apr 2024 10:28:38 -0500 Subject: [PATCH 8/9] to_file -> write_reference_file_system --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 11 ++-- tests/test_core.py | 12 ++-- tests/test_staging_area.py | 68 ++++++++++++++++++++++ tests/test_store.py | 2 +- 4 files changed, 79 insertions(+), 14 deletions(-) create mode 100644 tests/test_staging_area.py diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 9858248..589b56d 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -1,6 +1,6 @@ import json import base64 -from typing import Union, List, IO, Any, Dict, Literal +from typing import Union, List, IO, Any, Dict from dataclasses import dataclass import numpy as np import zarr @@ -460,16 +460,17 @@ def listdir(self, path: str = "") -> List[str]: else: return [] - def to_file(self, file_name: str, *, file_type: Literal["zarr.json"] = "zarr.json"): + def write_reference_file_system(self, output_file_name: str): """Write a reference file system corresponding to this store to a file. This can then be loaded using LindiH5pyFile.from_reference_file_system(file_name) """ - if file_type != "zarr.json": - raise Exception(f"Unsupported file type: {file_type}") + + if not output_file_name.endswith(".lindi.json"): + raise Exception("The output file name must end with .lindi.json") ret = self.to_reference_file_system() - with open(file_name, "w") as f: + with open(output_file_name, "w") as f: json.dump(ret, f, indent=2) def to_reference_file_system(self) -> dict: diff --git a/tests/test_core.py b/tests/test_core.py index 20a0d46..f57f9d7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -304,8 +304,8 @@ def test_reference_file_system_to_file(): with h5py.File(filename, "w") as f: f.create_dataset("X", data=[1, 2, 3]) with LindiH5ZarrStore.from_file(filename, url=filename) as store: - rfs_fname = f'{tmpdir}/test.zarr.json' - store.to_file(rfs_fname) + rfs_fname = f'{tmpdir}/test.lindi.json' + store.write_reference_file_system(rfs_fname) client = lindi.LindiH5pyFile.from_reference_file_system(rfs_fname) X = client["X"] assert isinstance(X, lindi.LindiH5pyDataset) @@ -423,7 +423,7 @@ def test_lindi_h5_zarr_store(): with pytest.raises(Exception, match=store_is_closed_msg): store.to_reference_file_system() with pytest.raises(Exception, match=store_is_closed_msg): - store.to_file("test.json") + store.write_reference_file_system("test.lindi.json") with pytest.raises(Exception, match=store_is_closed_msg): store._get_chunk_file_bytes_data("dataset1", "0") @@ -443,17 +443,13 @@ def test_lindi_h5_zarr_store(): store["nonexistent/0"] # Key error - store = LindiH5ZarrStore.from_file(filename) + store = LindiH5ZarrStore.from_file(filename, url='.') with pytest.raises(KeyError): store[''] assert '' not in store with pytest.raises(KeyError): store["nonexistent/.zattrs"] - # Unsupported file type - with pytest.raises(Exception, match="Unsupported file type: zarr"): - store.to_file("test.json", file_type="zarr") # type: ignore - # URL is not set store = LindiH5ZarrStore.from_file(filename, url=None) with pytest.raises(Exception, match="You must specify a url to create a reference file system"): diff --git a/tests/test_staging_area.py b/tests/test_staging_area.py new file mode 100644 index 0000000..e79f58b --- /dev/null +++ b/tests/test_staging_area.py @@ -0,0 +1,68 @@ +import tempfile +import os +import numpy as np +import lindi +import shutil + + +def test_staging_area(): + with tempfile.TemporaryDirectory() as tmpdir: + staging_area = lindi.StagingArea.create(tmpdir + '/staging_area') + empty_rfs = {'refs': {'.zgroup': {'zarr_format': 2}}} + client = lindi.LindiH5pyFile.from_reference_file_system(empty_rfs, mode='r+', staging_area=staging_area) + X = np.random.randn(1000, 1000).astype(np.float32) + client.create_dataset('large_array', data=X, chunks=(400, 400)) + total_size = _get_total_size_of_directory(tmpdir) + assert total_size >= X.nbytes * 0.5, f'{total_size} < {X.nbytes} * 0.5' # take into consideration compression + rfs = client.to_reference_file_system() + client2 = lindi.LindiH5pyFile.from_reference_file_system(rfs, mode='r') + assert isinstance(client2, lindi.LindiH5pyFile) + X1 = client['large_array'] + assert isinstance(X1, lindi.LindiH5pyDataset) + X2 = client2['large_array'] + assert isinstance(X2, lindi.LindiH5pyDataset) + assert np.allclose(X1[:], X2[:]) + + upload_dir = f'{tmpdir}/upload_dir' + os.makedirs(upload_dir, exist_ok=True) + output_fname = f'{tmpdir}/output.lindi.json' + + def on_upload_blob(fname: str): + random_fname = f'{upload_dir}/{_random_string(10)}' + shutil.copy(fname, random_fname) + return random_fname + + def on_upload_main(fname: str): + shutil.copy(fname, output_fname) + return output_fname + + assert client.staging_store + client.staging_store.upload( + on_upload_blob=on_upload_blob, + on_upload_main=on_upload_main, + consolidate_chunks=True + ) + + client3 = lindi.LindiH5pyFile.from_reference_file_system(output_fname, mode='r') + X3 = client3['large_array'] + assert isinstance(X3, lindi.LindiH5pyDataset) + assert np.allclose(X1[:], X3[:]) + + +def _get_total_size_of_directory(directory): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(directory): + for f in filenames: + fp = os.path.join(dirpath, f) + total_size += os.path.getsize(fp) + return total_size + + +def _random_string(n): + import random + import string + return ''.join(random.choices(string.ascii_uppercase + string.digits, k=n)) + + +if __name__ == '__main__': + test_staging_area() diff --git a/tests/test_store.py b/tests/test_store.py index c2ed53b..07d59cf 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -13,7 +13,7 @@ def test_store(): group1.create_group("group2") group1.create_dataset("dataset2", data=[4, 5, 6]) with lindi.LindiH5ZarrStore.from_file(filename, url=filename) as store: - store.to_file(f"{tmpdir}/test.zarr.json") # for coverage + store.write_reference_file_system(f"{tmpdir}/test.lindi.json") # for coverage a = store.listdir('') assert _lists_are_equal_as_sets(a, ['dataset1', 'group1']) b = store.listdir('group1') From 5b4b6f4d24477ce21b8f1dceff572a32d5b7f771 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 19 Apr 2024 10:35:57 -0500 Subject: [PATCH 9/9] use _write_rfs_to_file() --- lindi/LindiH5ZarrStore/LindiH5ZarrStore.py | 8 ++++---- lindi/LindiH5ZarrStore/_util.py | 8 ++++++++ lindi/LindiStagingStore/LindiStagingStore.py | 6 +++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py index 589b56d..4bdc420 100644 --- a/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py +++ b/lindi/LindiH5ZarrStore/LindiH5ZarrStore.py @@ -12,7 +12,8 @@ _get_chunk_byte_range, _get_byte_range_for_contiguous_dataset, _join, - _get_chunk_names_for_dataset + _get_chunk_names_for_dataset, + _write_rfs_to_file, ) from ..conversion.attr_conversion import h5_to_zarr_attr from ..conversion.reformat_json import reformat_json @@ -469,9 +470,8 @@ def write_reference_file_system(self, output_file_name: str): if not output_file_name.endswith(".lindi.json"): raise Exception("The output file name must end with .lindi.json") - ret = self.to_reference_file_system() - with open(output_file_name, "w") as f: - json.dump(ret, f, indent=2) + rfs = self.to_reference_file_system() + _write_rfs_to_file(rfs=rfs, output_file_name=output_file_name) def to_reference_file_system(self) -> dict: """Create a reference file system corresponding to this store. diff --git a/lindi/LindiH5ZarrStore/_util.py b/lindi/LindiH5ZarrStore/_util.py index 0866292..6deb5c0 100644 --- a/lindi/LindiH5ZarrStore/_util.py +++ b/lindi/LindiH5ZarrStore/_util.py @@ -1,4 +1,5 @@ from typing import IO, List +import json import numpy as np import h5py @@ -83,3 +84,10 @@ def _get_chunk_names_for_dataset(chunk_coords_shape: List[int]) -> List[str]: for name0 in names0: names.append(f"{i}.{name0}") return names + + +def _write_rfs_to_file(*, rfs: dict, output_file_name: str): + """Write a reference file system to a file. + """ + with open(output_file_name, "w") as f: + json.dump(rfs, f, indent=2, sort_keys=True) diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py index a140181..13f2e48 100644 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -5,6 +5,7 @@ from zarr.storage import Store as ZarrStore from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore from .StagingArea import StagingArea, _random_str +from ..LindiH5ZarrStore._util import _write_rfs_to_file # Accepts a string path to a file, uploads (or copies) it somewhere, and returns a string URL @@ -138,9 +139,8 @@ def upload( raise ValueError(f"Could not find url in blob mapping: {url1}") rfs['refs'][k][0] = url2 with tempfile.TemporaryDirectory() as tmpdir: - rfs_fname = f"{tmpdir}/rfs.json" - with open(rfs_fname, 'w') as f: - json.dump(rfs, f, indent=2, sort_keys=True) + rfs_fname = f"{tmpdir}/rfs.lindi.json" + _write_rfs_to_file(rfs=rfs, output_file_name=rfs_fname) return on_upload_main(rfs_fname) def consolidate_chunks(self):