From 07658fd6710c91150816bce4ce5dc5868a1e9dd7 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Mon, 15 Apr 2024 11:54:39 -0400 Subject: [PATCH] sort files prior to consolidating chunks --- lindi/LindiStagingStore/LindiStagingStore.py | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/lindi/LindiStagingStore/LindiStagingStore.py b/lindi/LindiStagingStore/LindiStagingStore.py index 9773ca5..78e01a5 100644 --- a/lindi/LindiStagingStore/LindiStagingStore.py +++ b/lindi/LindiStagingStore/LindiStagingStore.py @@ -171,6 +171,9 @@ def consolidate_chunks(self): if len(refs_keys_for_this_dir) <= 1: continue + # sort so that the files are in order 0.0.0, 0.0.1, 0.0.2, ... + files = _sort_by_chunk_key(files) + print(f'Consolidating {len(files)} files in {root}') offset = 0 @@ -206,6 +209,30 @@ def consolidate_chunks(self): os.remove(f"{root}/{fname}") +def _sort_by_chunk_key(files: list) -> list: + # first verify that all the files have the same number of parts + num_parts = None + for fname in files: + parts = fname.split('.') + if num_parts is None: + num_parts = len(parts) + elif len(parts) != num_parts: + raise ValueError(f"Files have different numbers of parts: {files}") + # Verify that all the parts are integers + for fname in files: + parts = fname.split('.') + for p in parts: + try: + int(p) + except ValueError: + raise ValueError(f"File part is not an integer: {fname}") + + def _chunk_key(fname: str) -> tuple: + parts = fname.split('.') + return tuple(int(p) for p in parts) + return sorted(files, key=_chunk_key) + + def _upload_directory_of_blobs( staging_dir: str, on_store_blob: StoreFileFunc