From 34a21f91dc7e8a90ee06c848d754c651134abcf6 Mon Sep 17 00:00:00 2001 From: Jeremy Magland Date: Fri, 2 Aug 2024 11:51:08 -0400 Subject: [PATCH] lindi tar format --- .gitignore | 1 + examples/write_lindi_binary.py | 21 ++ lindi/LindiH5pyFile/LindiH5pyFile.py | 231 +++++++++++--- .../LindiReferenceFileSystemStore.py | 61 +++- lindi/tar/LindiTarStore.py | 75 +++++ lindi/tar/lindi_tar.py | 281 ++++++++++++++++++ 6 files changed, 609 insertions(+), 61 deletions(-) create mode 100644 examples/write_lindi_binary.py create mode 100644 lindi/tar/LindiTarStore.py create mode 100644 lindi/tar/lindi_tar.py diff --git a/.gitignore b/.gitignore index c55f5c4..8c00826 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.lindi *.lindi.json* *.nwb diff --git a/examples/write_lindi_binary.py b/examples/write_lindi_binary.py new file mode 100644 index 0000000..bd60f31 --- /dev/null +++ b/examples/write_lindi_binary.py @@ -0,0 +1,21 @@ +import numpy as np +import lindi + + +def write_lindi_binary(): + with lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='w') as f: + f.attrs['test'] = 42 + ds = f.create_dataset('data', shape=(1000, 1000), dtype='f4') + ds[...] = np.random.rand(1000, 1000) + + +def test_read(): + f = lindi.LindiH5pyFile.from_lindi_file('test.lindi', mode='r') + print(f.attrs['test']) + print(f['data'][0, 0]) + f.close() + + +if __name__ == "__main__": + write_lindi_binary() + test_read() diff --git a/lindi/LindiH5pyFile/LindiH5pyFile.py b/lindi/LindiH5pyFile/LindiH5pyFile.py index 93c85d2..adb5ef2 100644 --- a/lindi/LindiH5pyFile/LindiH5pyFile.py +++ b/lindi/LindiH5pyFile/LindiH5pyFile.py @@ -20,6 +20,9 @@ from ..LindiH5ZarrStore._util import _write_rfs_to_file +from ..tar.lindi_tar import LindiTarFile +from ..tar.LindiTarStore import LindiTarStore + LindiFileMode = Literal["r", "r+", "w", "w-", "x", "a"] @@ -29,7 +32,7 @@ class LindiH5pyFile(h5py.File): - def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _local_file_path: Union[str, None] = None): + def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, None] = None, _mode: LindiFileMode = "r", _local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None): """ Do not use this constructor directly. Instead, use: from_lindi_file, from_h5py_file, from_reference_file_system, from_zarr_store, or @@ -40,22 +43,25 @@ def __init__(self, _zarr_group: zarr.Group, *, _zarr_store: Union[ZarrStore, Non self._mode: LindiFileMode = _mode self._the_group = LindiH5pyGroup(_zarr_group, self) self._local_cache = _local_cache - self._local_file_path = _local_file_path + self._source_url_or_path = _source_url_or_path + self._source_tar_file = _source_tar_file # see comment in LindiH5pyGroup self._id = f'{id(self._zarr_group)}/' @staticmethod - def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_lindi_file(url_or_path: str, *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None): """ Create a LindiH5pyFile from a URL or path to a .lindi.json file. For a description of parameters, see from_reference_file_system(). """ - if local_file_path is None: - if not url_or_path.startswith("http://") and not url_or_path.startswith("https://"): - local_file_path = url_or_path - return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + return LindiH5pyFile.from_reference_file_system( + url_or_path, + mode=mode, + staging_area=staging_area, + local_cache=local_cache + ) @staticmethod def from_hdf5_file( @@ -99,7 +105,7 @@ def from_hdf5_file( ) @staticmethod - def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMode = "r", staging_area: Union[StagingArea, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None): """ Create a LindiH5pyFile from a reference file system. @@ -116,11 +122,10 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo is only used in write mode, by default None. local_cache : Union[LocalCache, None], optional The local cache to use for caching data, by default None. - local_file_path : Union[str, None], optional - If rfs is not a string or is a remote url, this is the path to the - local file for the purpose of writing to it. It is required in this - case if mode is not "r". If rfs is a string and not a remote url, it - must be equal to local_file_path if provided. + _source_url_or_path : Union[str, None], optional + Internal use only + _source_tar_file : Union[LindiTarFile, None], optional + Internal use only """ if rfs is None: rfs = { @@ -132,25 +137,23 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo } if isinstance(rfs, str): + if _source_url_or_path is not None: + raise Exception("_source_file_path is not None even though rfs is a string") + if _source_tar_file is not None: + raise Exception("_source_tar_file is not None even though rfs is a string") rfs_is_url = rfs.startswith("http://") or rfs.startswith("https://") - if local_file_path is not None and not rfs_is_url and rfs != local_file_path: - raise Exception(f"rfs is not a remote url, so local_file_path must be the same as rfs, but got: {rfs} and {local_file_path}") if rfs_is_url: - with tempfile.TemporaryDirectory() as tmpdir: - filename = f"{tmpdir}/temp.lindi.json" - _download_file(rfs, filename) - with open(filename, "r") as f: - data = json.load(f) - assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + data, tar_file = _load_rfs_from_url(rfs) + return LindiH5pyFile.from_reference_file_system( + data, + mode=mode, + staging_area=staging_area, + local_cache=local_cache, + _source_tar_file=tar_file + ) else: - empty_rfs = { - "refs": { - '.zgroup': { - 'zarr_format': 2 - } - }, - } + # local file + need_to_create_empty_file = False if mode == "r": # Readonly, file must exist (default) if not os.path.exists(rfs): @@ -161,36 +164,59 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: LindiFileMo raise Exception(f"File does not exist: {rfs}") elif mode == "w": # Create file, truncate if exists - with open(rfs, "w") as f: - json.dump(empty_rfs, f) + need_to_create_empty_file = True + elif mode in ["w-", "x"]: # Create file, fail if exists if os.path.exists(rfs): raise Exception(f"File already exists: {rfs}") - with open(rfs, "w") as f: - json.dump(empty_rfs, f) + need_to_create_empty_file = True elif mode == "a": # Read/write if exists, create otherwise - if os.path.exists(rfs): - with open(rfs, "r") as f: - data = json.load(f) + if not os.path.exists(rfs): + need_to_create_empty_file = True else: raise Exception(f"Unhandled mode: {mode}") - with open(rfs, "r") as f: - data = json.load(f) + if need_to_create_empty_file: + tar = rfs.endswith(".tar") or rfs.endswith(".lindi") + _create_empty_lindi_file(rfs, tar=tar) + data, tar_file = _load_rfs_from_local_file(rfs) assert isinstance(data, dict) # prevent infinite recursion - return LindiH5pyFile.from_reference_file_system(data, mode=mode, staging_area=staging_area, local_cache=local_cache, local_file_path=local_file_path) + return LindiH5pyFile.from_reference_file_system( + data, + mode=mode, + staging_area=staging_area, + local_cache=local_cache, + _source_url_or_path=rfs, + _source_tar_file=tar_file + ) elif isinstance(rfs, dict): # This store does not need to be closed - store = LindiReferenceFileSystemStore(rfs, local_cache=local_cache) + store = LindiReferenceFileSystemStore( + rfs, + local_cache=local_cache, + _source_url_or_path=_source_url_or_path, + _source_tar_file=_source_tar_file + ) + source_is_url = _source_url_or_path is not None and (_source_url_or_path.startswith("http://") or _source_url_or_path.startswith("https://")) if staging_area: + if _source_tar_file and not source_is_url: + raise Exception("Cannot use staging area when source is a local tar file") store = LindiStagingStore(base_store=store, staging_area=staging_area) - return LindiH5pyFile.from_zarr_store(store, mode=mode, local_file_path=local_file_path, local_cache=local_cache) + elif _source_url_or_path and _source_tar_file and not source_is_url: + store = LindiTarStore(base_store=store, tar_file=_source_tar_file) + return LindiH5pyFile.from_zarr_store( + store, + mode=mode, + local_cache=local_cache, + _source_url_or_path=_source_url_or_path, + _source_tar_file=_source_tar_file + ) else: raise Exception(f"Unhandled type for rfs: {type(rfs)}") @staticmethod - def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None): """ Create a LindiH5pyFile from a zarr store. @@ -207,10 +233,10 @@ def from_zarr_store(zarr_store: ZarrStore, mode: LindiFileMode = "r", local_cach # does not need to be closed zarr_group = zarr.open(store=zarr_store, mode=mode) assert isinstance(zarr_group, zarr.Group) - return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, local_file_path=local_file_path) + return LindiH5pyFile.from_zarr_group(zarr_group, _zarr_store=zarr_store, mode=mode, local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file) @staticmethod - def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, local_file_path: Union[str, None] = None): + def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_store: Union[ZarrStore, None] = None, local_cache: Union[LocalCache, None] = None, _source_url_or_path: Union[str, None] = None, _source_tar_file: Union[LindiTarFile, None] = None): """ Create a LindiH5pyFile from a zarr group. @@ -228,7 +254,7 @@ def from_zarr_group(zarr_group: zarr.Group, *, mode: LindiFileMode = "r", _zarr_ See from_zarr_store(). """ - return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _local_file_path=local_file_path) + return LindiH5pyFile(zarr_group, _zarr_store=_zarr_store, _mode=mode, _local_cache=local_cache, _source_url_or_path=_source_url_or_path, _source_tar_file=_source_tar_file) def to_reference_file_system(self): """ @@ -241,6 +267,8 @@ def to_reference_file_system(self): if isinstance(zarr_store, LindiStagingStore): zarr_store.consolidate_chunks() zarr_store = zarr_store._base_store + if isinstance(zarr_store, LindiTarStore): + zarr_store = zarr_store._base_store if isinstance(zarr_store, LindiH5ZarrStore): return zarr_store.to_reference_file_system() if not isinstance(zarr_store, LindiReferenceFileSystemStore): @@ -358,9 +386,17 @@ def close(self): self.flush() def flush(self): - if self._mode != 'r' and self._local_file_path is not None: + if self._mode != 'r' and self._source_url_or_path is not None: + is_url = self._source_url_or_path.startswith("http://") or self._source_url_or_path.startswith("https://") + if is_url: + raise Exception("Cannot write to URL") + if self._source_tar_file is None: + raise Exception("_source_tar_file is None even though _source_url_or_path is not None") rfs = self.to_reference_file_system() - _write_rfs_to_file(rfs=rfs, output_file_name=self._local_file_path) + if self._source_tar_file: + self._source_tar_file.write_rfs(rfs) + else: + _write_rfs_to_file(rfs=rfs, output_file_name=self._source_url_or_path) def __enter__(self): # type: ignore return self @@ -611,3 +647,104 @@ def _format_size_bytes(size_bytes: int) -> str: return f"{size_bytes / 1024 / 1024:.1f} MB" else: return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB" + + +def _load_rfs_from_url(url: str): + file_size = _get_file_size_of_remote_file(url) + if file_size < 1024 * 1024 * 2: + # if it's a small file, we'll just download the whole thing + with tempfile.TemporaryDirectory() as tmpdir: + tmp_fname = f"{tmpdir}/temp.lindi.json" + _download_file(url, tmp_fname) + data, tar_file = _load_rfs_from_local_file(tmp_fname) + return data, tar_file + else: + # if it's a large file, we start by downloading the entry file and then the index file + tar_entry_buf = _download_file_byte_range(url, 0, 512) + is_tar = _check_is_tar_header(tar_entry_buf[:512]) + if is_tar: + tar_file = LindiTarFile(url) + rfs_json = tar_file.read_file("lindi.json") + rfs = json.loads(rfs_json) + return rfs, tar_file + else: + # In this case, it must be a regular json file + with tempfile.TemporaryDirectory() as tmpdir: + tmp_fname = f"{tmpdir}/temp.lindi.json" + _download_file(url, tmp_fname) + with open(tmp_fname, "r") as f: + return json.load(f), None + + +def _load_rfs_from_local_file(fname: str): + file_size = os.path.getsize(fname) + if file_size >= 512: + # Read first bytes to check if it's a tar file + with open(fname, "rb") as f: + tar_entry_buf = f.read(512) + is_tar = _check_is_tar_header(tar_entry_buf) + if is_tar: + tar_file = LindiTarFile(fname) + rfs_json = tar_file.read_file("lindi.json") + rfs = json.loads(rfs_json) + return rfs, tar_file + + # Must be a regular json file + with open(fname, "r") as f: + return json.load(f), None + + +def _check_is_tar_header(header_buf: bytes) -> bool: + if len(header_buf) < 512: + return False + + # We're only going to support ustar format + # get the ustar indicator at bytes 257-262 + if header_buf[257:262] == b"ustar" and header_buf[262] == 0: + # Note that it's unlikely but possible that a json file could have the + # string "ustar" at these bytes, but it would not have a null byte at + # byte 262 + return True + + # Check for any 0 bytes in the header + if b"\0" in header_buf: + print(header_buf[257:262]) + raise Exception("Problem with lindi file: 0 byte found in header, but not ustar tar format") + + return False + + +def _get_file_size_of_remote_file(url: str) -> int: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + return int(response.headers['Content-Length']) + + +def _download_file_byte_range(url: str, start: int, end: int) -> bytes: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + "Range": f"bytes={start}-{end - 1}" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + return response.read() + + +def _create_empty_lindi_file(fname: str, *, tar: bool = False): + empty_rfs = { + "refs": { + ".zgroup": { + "zarr_format": 2 + } + } + } + if tar: + LindiTarFile.create(fname) + tf = LindiTarFile(fname) + tf.write_rfs(empty_rfs) + else: + with open(fname, "w") as f: + json.dump(empty_rfs, f) diff --git a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py index 7d2d5fa..5bf7446 100644 --- a/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py +++ b/lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py @@ -6,6 +6,7 @@ from zarr.storage import Store as ZarrStore from ..LocalCache.LocalCache import ChunkTooLargeError, LocalCache +from ..tar.lindi_tar import LindiTarFile class LindiReferenceFileSystemStore(ZarrStore): @@ -69,7 +70,15 @@ class LindiReferenceFileSystemStore(ZarrStore): It is okay for rfs to be modified outside of this class, and the changes will be reflected immediately in the store. """ - def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: Union[LocalCache, None] = None): + def __init__( + self, + rfs: dict, + *, + mode: Literal["r", "r+"] = "r+", + local_cache: Union[LocalCache, None] = None, + _source_url_or_path: Union[str, None] = None, + _source_tar_file: Union[LindiTarFile, None] = None + ): """ Create a LindiReferenceFileSystemStore. @@ -114,6 +123,8 @@ def __init__(self, rfs: dict, *, mode: Literal["r", "r+"] = "r+", local_cache: U self.rfs = rfs self.mode = mode self.local_cache = local_cache + self._source_file_path = _source_url_or_path + self._source_tar_file = _source_tar_file # These methods are overridden from MutableMapping def __contains__(self, key: object): @@ -145,22 +156,44 @@ def _get_helper(self, key: str): elif isinstance(x, list): if len(x) != 3: raise Exception("list must have 3 elements") # pragma: no cover - url = x[0] + url_or_path = x[0] offset = x[1] length = x[2] - if '{{' in url and '}}' in url and 'templates' in self.rfs: + if '{{' in url_or_path and '}}' in url_or_path and 'templates' in self.rfs: for k, v in self.rfs["templates"].items(): - url = url.replace("{{" + k + "}}", v) - if self.local_cache is not None: - x = self.local_cache.get_remote_chunk(url=url, offset=offset, size=length) - if x is not None: - return x - val = _read_bytes_from_url_or_path(url, offset, length) - if self.local_cache is not None: - try: - self.local_cache.put_remote_chunk(url=url, offset=offset, size=length, data=val) - except ChunkTooLargeError: - print(f'Warning: unable to cache chunk of size {length} on LocalCache (key: {key})') + url_or_path = url_or_path.replace("{{" + k + "}}", v) + is_url = url_or_path.startswith('http://') or url_or_path.startswith('https://') + if url_or_path.startswith('./'): + if self._source_file_path is None: + raise Exception(f"Cannot resolve relative path {url_or_path} without source file path") + if self._source_tar_file is None: + raise Exception(f"Cannot resolve relative path {url_or_path} without source file type") + if self._source_tar_file: + start_byte, end_byte = self._source_tar_file.get_file_byte_range(file_name=url_or_path[2:]) + if start_byte + offset + length > end_byte: + raise Exception(f"Chunk {key} is out of bounds in tar file {url_or_path}") + url_or_path = self._source_file_path + offset = offset + start_byte + else: + if is_url: + raise Exception(f"Cannot resolve relative path {url_or_path} for URL that is not a tar") + else: + source_file_parent_dir = '/'.join(self._source_file_path.split('/')[:-1]) + abs_path = source_file_parent_dir + '/' + url_or_path[2:] + url_or_path = abs_path + if is_url: + if self.local_cache is not None: + x = self.local_cache.get_remote_chunk(url=url_or_path, offset=offset, size=length) + if x is not None: + return x + val = _read_bytes_from_url_or_path(url_or_path, offset, length) + if self.local_cache is not None: + try: + self.local_cache.put_remote_chunk(url=url_or_path, offset=offset, size=length, data=val) + except ChunkTooLargeError: + print(f'Warning: unable to cache chunk of size {length} on LocalCache (key: {key})') + else: + val = _read_bytes_from_url_or_path(url_or_path, offset, length) return val else: # should not happen given checks in __init__, but self.rfs is mutable diff --git a/lindi/tar/LindiTarStore.py b/lindi/tar/LindiTarStore.py new file mode 100644 index 0000000..8bd0f87 --- /dev/null +++ b/lindi/tar/LindiTarStore.py @@ -0,0 +1,75 @@ +import random +from zarr.storage import Store as ZarrStore +from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore +from .lindi_tar import LindiTarFile + + +class LindiTarStore(ZarrStore): + def __init__(self, *, base_store: LindiReferenceFileSystemStore, tar_file: LindiTarFile): + self._base_store = base_store + self._tar_file = tar_file + + def __getitem__(self, key: str): + return self._base_store.__getitem__(key) + + def __setitem__(self, key: str, value: bytes): + key_parts = key.split("/") + key_base_name = key_parts[-1] + if key_base_name.startswith('.') or key_base_name.endswith('.json'): # always inline .zattrs, .zgroup, .zarray, zarr.json + inline = True + else: + # presumably it is a chunk of an array + if not isinstance(value, bytes): + raise ValueError("Value must be bytes") + size = len(value) + inline = size < 1000 # this should be a configurable threshold + if inline: + # If inline, save in memory + return self._base_store.__setitem__(key, value) + else: + # If not inline, save it as a new file in the tar file + key_without_initial_slash = key if not key.startswith("/") else key[1:] + random_string = _create_random_string(8) + fname_in_tar = f'blobs/{random_string}/{key_without_initial_slash}' + self._tar_file.write_file(fname_in_tar, value) + + self._set_ref_reference(key_without_initial_slash, f'./{fname_in_tar}', 0, len(value)) + + def __delitem__(self, key: str): + # We don't actually delete the file from the tar, but maybe it would be + # smart to put it in .trash in the future + return self._base_store.__delitem__(key) + + def __iter__(self): + return self._base_store.__iter__() + + def __len__(self): + return self._base_store.__len__() + + # These methods are overridden from BaseStore + def is_readable(self): + return True + + def is_writeable(self): + return True + + def is_listable(self): + return True + + def is_erasable(self): + return False + + def _set_ref_reference(self, key: str, filename: str, offset: int, size: int): + rfs = self._base_store.rfs + if 'refs' not in rfs: + # this shouldn't happen, but we'll be defensive + rfs['refs'] = {} + rfs['refs'][key] = [ + filename, + offset, + size + ] + + +def _create_random_string(num_chars: int) -> str: + return ''.join(random.choices("abcdefghijklmnopqrstuvwxyz", k=num_chars)) diff --git a/lindi/tar/lindi_tar.py b/lindi/tar/lindi_tar.py new file mode 100644 index 0000000..5b666fa --- /dev/null +++ b/lindi/tar/lindi_tar.py @@ -0,0 +1,281 @@ +import json +import tarfile +import random +import io +import urllib.request + + +TAR_ENTRY_JSON_SIZE = 1024 +INITIAL_TAR_INDEX_JSON_SIZE = 1024 * 256 +INITIAL_LINDI_JSON_SIZE = 1024 * 256 + + +def _pad_bytes_to_leave_room_for_growth(x: str, initial_size: int) -> bytes: + total_size = initial_size + while total_size < len(x) * 4: + total_size *= 2 + padding = b" " * (total_size - len(x)) + return x.encode() + padding + + +def _fix_checksum_in_header(f, header_start_byte): + f.seek(header_start_byte) + header = f.read(512) + + # From https://en.wikipedia.org/wiki/Tar_(computing) + # The checksum is calculated by taking the sum of the unsigned byte values + # of the header record with the eight checksum bytes taken to be ASCII + # spaces (decimal value 32). It is stored as a six digit octal number with + # leading zeroes followed by a NUL and then a space. Various implementations + # do not adhere to this format. In addition, some historic tar + # implementations treated bytes as signed. Implementations typically + # calculate the checksum both ways, and treat it as good if either the + # signed or unsigned sum matches the included checksum. + + header_byte_list = [] + for byte in header: + header_byte_list.append(byte) + for i in range(148, 156): + header_byte_list[i] = 32 + sum = 0 + for byte in header_byte_list: + sum += byte + checksum = oct(sum).encode()[2:] + while len(checksum) < 6: + checksum = b"0" + checksum + checksum += b"\0 " + f.seek(header_start_byte + 148) + f.write(checksum) + + +def _create_random_string(): + return "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=10)) + + +class LindiTarFile: + def __init__(self, tar_file_path: str): + self._tar_file_path = tar_file_path + + with open(self._tar_file_path, "rb") as f: + # Load the entry json + f.seek(512) + entry_json = f.read(TAR_ENTRY_JSON_SIZE) + entry = json.loads(entry_json) + index_info = entry['index'] + + # Load the index json + f.seek(index_info['d']) + index_json = f.read(index_info['s']) + self._index = json.loads(index_json) + + def get_file_info(self, file_name: str): + for file in self._index['files']: + if file['n'] == file_name: + return file + return None + + def overwrite_file_content(self, file_name: str, data: bytes): + info = self.get_file_info(file_name) + if info is None: + raise FileNotFoundError(f"File {file_name} not found") + if info['s'] != len(data): + raise ValueError("Unexpected problem in overwrite_file_content(): data size must match the size of the existing file") + with open(self._tar_file_path, "r+b") as f: + f.seek(info['d']) + f.write(data) + + def trash_file(self, file_name: str, do_write_index=True): + info = self.get_file_info(file_name) + if info is None: + raise FileNotFoundError(f"File {file_name} not found") + zeros = b"-" * info['s'] + with open(self._tar_file_path, "r+b") as f: + f.seek(info['d']) + f.write(zeros) + self._change_name_of_file(file_name, f'.trash/{file_name}.{_create_random_string()}', do_write_index=do_write_index) + + def write_rfs(self, rfs: dict): + rfs_json = json.dumps(rfs, indent=2, sort_keys=True) + + existing_lindi_json_info = self.get_file_info("lindi.json") + if existing_lindi_json_info is not None: + file_size = existing_lindi_json_info['s'] + if file_size >= len(rfs_json): + # We are going to overwrite the existing lindi.json with the new + # one. But first we pad it with spaces to the same size as the + # existing one. + padding = b" " * (file_size - len(rfs_json)) + rfs_json = rfs_json.encode() + padding + self.overwrite_file_content("lindi.json", rfs_json) + else: + # In this case we need to trash the existing file and write a new one + # at the end of the tar file. + self.trash_file("lindi.json") + rfs_json = _pad_bytes_to_leave_room_for_growth(rfs_json, INITIAL_LINDI_JSON_SIZE) + self.write_file("lindi.json", rfs_json) + else: + # We are writing a new lindi.json. + rfs_json = _pad_bytes_to_leave_room_for_growth(rfs_json, INITIAL_LINDI_JSON_SIZE) + self.write_file("lindi.json", rfs_json) + + def get_file_byte_range(self, file_name: str) -> tuple: + info = self.get_file_info(file_name) + if info is None: + raise FileNotFoundError(f"File {file_name} not found in tar file") + return info['d'], info['d'] + info['s'] + + def _change_name_of_file(self, file_name: str, new_file_name: str, do_write_index=True): + info = self.get_file_info(file_name) + if info is None: + raise FileNotFoundError(f"File {file_name} not found") + header_start_byte = info['o'] + file_name_byte_range = (header_start_byte + 0, header_start_byte + 100) + file_name_prefix_byte_range = (header_start_byte + 345, header_start_byte + 345 + 155) + with open(self._tar_file_path, "r+b") as f: + f.seek(file_name_byte_range[0]) + f.write(new_file_name.encode()) + # set the rest of the field to zeros + f.write(b"\0" * (file_name_byte_range[1] - file_name_byte_range[0] - len(new_file_name))) + + f.seek(file_name_prefix_byte_range[0]) + f.write(b"\0" * (file_name_prefix_byte_range[1] - file_name_prefix_byte_range[0])) + + _fix_checksum_in_header(f, header_start_byte) + try: + file_in_index = next(file for file in self._index['files'] if file['n'] == file_name) + except StopIteration: + raise ValueError(f"File {file_name} not found in index") + file_in_index['n'] = new_file_name + if do_write_index: + self._update_index() + + def write_file(self, file_name: str, data: bytes): + with tarfile.open(self._tar_file_path, "a") as tar: + tarinfo = tarfile.TarInfo(name=file_name) + tarinfo.size = len(data) + fileobj = io.BytesIO(data) + tar.addfile(tarinfo, fileobj) + with tarfile.open(self._tar_file_path, "r") as tar: + # TODO: do not call getmember here, because it may be slow instead + # parse the header of the new file directly and get the offset from + # there + info = tar.getmember(file_name) + self._index['files'].append({ + 'n': file_name, + 'o': info.offset, + 'd': info.offset_data, + 's': info.size + }) + self._update_index() + + def read_file(self, file_name: str) -> bytes: + info = self.get_file_info(file_name) + if info is None: + raise FileNotFoundError(f"File {file_name} not found") + start_byte = info['d'] + size = info['s'] + with open(self._tar_file_path, "rb") as f: + f.seek(start_byte) + return f.read(size) + + @staticmethod + def create(fname: str): + with tarfile.open(fname, "w") as tar: + # write the initial entry file this MUST be the first file in the + # tar file + tar_entry_json_name = ".tar_entry.json" + tarinfo = tarfile.TarInfo(name=tar_entry_json_name) + tarinfo.size = TAR_ENTRY_JSON_SIZE + tar.addfile(tarinfo, io.BytesIO(b" " * TAR_ENTRY_JSON_SIZE)) + + # write the initial index file this will start as the second file in + # the tar file but as it grows it will be replaced. Importantly, the + # entry will always be the first file. + tar_index_json_size = INITIAL_TAR_INDEX_JSON_SIZE + tar_index_json_name = ".tar_index.json" + tarinfo = tarfile.TarInfo(name=tar_index_json_name) + tarinfo.size = tar_index_json_size + tar.addfile(tarinfo, io.BytesIO(b" " * tar_index_json_size)) + + # It seems that we need to close and then open it again in order + # to get the correct data offsets for the files. + with tarfile.open(fname, "r") as tar: + tar_entry_json_info = tar.getmember(tar_entry_json_name) + tar_index_json_info = tar.getmember(tar_index_json_name) + + # fill the entry file + initial_entry_json = json.dumps({ + 'index': { + 'n': tar_index_json_name, + 'o': tar_index_json_info.offset, + 'd': tar_index_json_info.offset_data, + 's': tar_index_json_info.size + } + }, indent=2, sort_keys=True) + initial_entry_json = initial_entry_json.encode() + b" " * (tar_entry_json_info.size - len(initial_entry_json)) + with open(fname, "r+b") as f: + f.seek(tar_entry_json_info.offset_data) + f.write(initial_entry_json) + + # fill the index file + initial_index_json = json.dumps({ + 'files': [ + { + 'n': info.name, + 'o': info.offset, + 'd': info.offset_data, + 's': info.size + } + for info in [tar_entry_json_info, tar_index_json_info] + ] + }, indent=2, sort_keys=True) + initial_index_json = initial_index_json.encode() + b" " * (tar_index_json_size - len(initial_index_json)) + with open(fname, "r+b") as f: + f.seek(tar_index_json_info.offset_data) + f.write(initial_index_json) + + def _update_index(self): + existing_index_json = self.read_file(".tar_index.json") + new_index_json = json.dumps(self._index, indent=2, sort_keys=True) + if len(new_index_json) <= len(existing_index_json): + # we can overwrite the existing index file + new_index_json = new_index_json.encode() + b" " * (len(existing_index_json) - len(new_index_json)) + self.overwrite_file_content(".tar_index.json", new_index_json) + else: + # we must create a new index file + self.trash_file(".tar_index.json", do_write_index=False) + + # after we trash the file, the index has changed once again + new_index_json = json.dumps(self._index, indent=2, sort_keys=True) + + new_index_json = _pad_bytes_to_leave_room_for_growth(new_index_json, INITIAL_TAR_INDEX_JSON_SIZE) + self.write_file(".tar_index.json", new_index_json) + + # now we need to update the entry file + tar_index_info = self.get_file_info(".tar_index.json") + if tar_index_info is None: + raise ValueError("tar_index_info is None") + new_entry_json = json.dumps({ + 'index': { + 'n': tar_index_info.name, + 'o': tar_index_info.offset, + 'd': tar_index_info.offset_data, + 's': tar_index_info.size + } + }, indent=2, sort_keys=True) + new_entry_json = new_entry_json.encode() + b" " * (TAR_ENTRY_JSON_SIZE - len(new_entry_json)) + with open(self._tar_file_path, "r+b") as f: + # we assume the first file is the entry file, and we assume the header is 512 bytes + # this is to avoid calling the potentially expensive getmember() method + f.seek(512) + f.write(new_entry_json) + + +def _download_file_byte_range(url: str, start: int, end: int) -> bytes: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3", + "Range": f"bytes={start}-{end - 1}" + } + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req) as response: + return response.read() \ No newline at end of file